【发布时间】:2018-11-13 11:30:23
【问题描述】:
我想了解futures::sync::mpsc::Receiver 的工作原理。在下面的示例中,接收者线程休眠两秒钟,发送者每秒发送一次。
我希望发送者会因为等待而被阻塞,然后在释放缓冲区时发送。
我看到的是它在一段时间后陷入僵局。增加通道的缓冲区只会延长它被阻塞的时间。
我应该怎么做才能让发送者在缓冲区可用时发送数据,并在这种情况下给发送者一些背压? futures::sync::mpsc::channel有自己的文档,但是我不明白如何正确使用。
extern crate futures;
extern crate tokio_core;
use std::{thread, time};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats {
pub success: usize,
pub failure: usize,
}
fn main() {
let mut core = Core::new().expect("Failed to create core");
let remote = core.remote();
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || loop {
let tx = tx.clone();
let delay = time::Duration::from_secs(1);
thread::sleep(delay);
let f = ::futures::done::<(), ()>(Ok(()));
remote.spawn(|_| {
f.then(|res| {
println!("Sending");
tx.send(res).wait();
println!("Sent");
Ok(())
})
});
});
let mut stats = Stats {
success: 0,
failure: 0,
};
let f2 = rx.for_each(|res| {
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res {
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
}
println!("stats = {:?}", stats);
Ok(())
});
core.run(f2).expect("Core failed to run");
}
【问题讨论】:
-
futures::done来自哪里?它不是 futures crate 当前版本的一部分。
标签: rust future rust-tokio