【问题标题】:How can I implement a blocking queue mechanism with futures::sync::mpsc::channel?如何使用 futures::sync::mpsc::channel 实现阻塞队列机制?
【发布时间】:2018-11-13 11:30:23
【问题描述】:

我想了解futures::sync::mpsc::Receiver 的工作原理。在下面的示例中,接收者线程休眠两秒钟,发送者每秒发送一次。

我希望发送者会因为等待而被阻塞,然后在释放缓冲区时发送。

我看到的是它在一段时间后陷入僵局。增加通道的缓冲区只会延长它被阻塞的时间。

我应该怎么做才能让发送者在缓冲区可用时发送数据,并在这种情况下给发送者一些背压? futures::sync::mpsc::channel有自己的文档,但是我不明白如何正确使用。

extern crate futures;
extern crate tokio_core;

use std::{thread, time};

use futures::sync::mpsc;
use futures::{Future, Sink, Stream};

use tokio_core::reactor::Core;

#[derive(Debug)]
struct Stats {
    pub success: usize,
    pub failure: usize,
}

fn main() {
    let mut core = Core::new().expect("Failed to create core");
    let remote = core.remote();

    let (tx, rx) = mpsc::channel(1);

    thread::spawn(move || loop {
        let tx = tx.clone();

        let delay = time::Duration::from_secs(1);
        thread::sleep(delay);
        let f = ::futures::done::<(), ()>(Ok(()));

        remote.spawn(|_| {
            f.then(|res| {
                println!("Sending");
                tx.send(res).wait();
                println!("Sent");
                Ok(())
            })
        });
    });

    let mut stats = Stats {
        success: 0,
        failure: 0,
    };

    let f2 = rx.for_each(|res| {
        println!("Received");
        let delay = time::Duration::from_secs(2);
        thread::sleep(delay);

        match res {
            Ok(_) => stats.success += 1,
            Err(_) => stats.failure += 1,
        }
        println!("stats = {:?}", stats);

        Ok(())
    });

    core.run(f2).expect("Core failed to run");
}

【问题讨论】:

  • futures::done 来自哪里?它不是 futures crate 当前版本的一部分。

标签: rust future rust-tokio


【解决方案1】:
  1. 永远不要在未来调用wait。这就是阻塞,并且阻塞永远不应该在未来内完成。

  2. 永远不要在未来调用sleep。这就是阻塞,并且阻塞永远不应该在未来内完成。

  3. 通道背压由send consumes the Sender 实现并返回一个未来。当队列中有空间时,future 会将Sender 返回给您

extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11

use futures::{future, sync::mpsc, Future, Sink, Stream};
use std::time::Duration;
use tokio::timer::Interval;

#[derive(Debug)]
struct Stats {
    pub success: usize,
    pub failure: usize,
}

fn main() {
    tokio::run(future::lazy(|| {
        let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);

        tokio::spawn({
            Interval::new_interval(Duration::from_millis(10))
                .map_err(|e| panic!("Interval error: {}", e))
                .fold(tx, |tx, _| {
                    tx.send(Ok(())).map_err(|e| panic!("Send error: {}", e))
                })
                .map(drop) // discard the tx
        });

        let mut stats = Stats {
            success: 0,
            failure: 0,
        };

        let i = Interval::new_interval(Duration::from_millis(20))
            .map_err(|e| panic!("Interval error: {}", e));

        rx.zip(i).for_each(move |(res, _)| {
            println!("Received");
            match res {
                Ok(_) => stats.success += 1,
                Err(_) => stats.failure += 1,
            }
            println!("stats = {:?}", stats);

            Ok(())
        })
    }));
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-09-26
    • 2016-08-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-16
    • 2018-05-01
    相关资源
    最近更新 更多