Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

rust - How can I stop reading from a tokio::io::lines stream?

I want to terminate reading from a tokio::io::lines stream. I merged it with a oneshot future and terminated it, but tokio::run was still working.

use futures::{sync::oneshot, *}; // 0.1.27
use std::{io::BufReader, time::Duration};
use tokio::prelude::*; // 0.1.21

fn main() {
    let (tx, rx) = oneshot::channel::<()>();
    let lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
    let lines = lines.for_each(|item| {
        println!("> {:?}", item);
        Ok(())
    });

    std::thread::spawn(move || {
        std::thread::sleep(Duration::from_millis(5000));
        println!("system shutting down");
        let _ = tx.send(());
    });

    let lines = lines.select2(rx);

    tokio::run(lines.map(|_| ()).map_err(|_| ()));
}

How can I stop reading from this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

There's nothing wrong with your strategy, but it will only work with futures that don't execute a blocking operation via Tokio's blocking (the traditional kind of blocking should never be done inside a future).

You can test this by replacing the tokio::io::lines(..) future with a simple interval future:

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

The problem is that tokio::io::Stdin internally uses tokio_threadpool::blocking .

When you use Tokio thread pool blocking (emphasis mine):

NB: The entire task that called blocking is blocked whenever the supplied closure blocks, even if you have used future combinators such as select - the other futures in this task will not make progress until the closure returns. If this is not desired, ensure that blocking runs in its own task (e.g. using futures::sync::oneshot::spawn).

Since this will block every other future in the combinator, your Receiver will not be able to get a signal from the Senderuntil the blocking ends.

Please see How can I read non-blocking from stdin? or you can use tokio-stdin-stdout, which creates a channel to consume data from stdin thread. It also has a line-by-line example.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...