【问题标题】:How to make a simple futures::sync::mpsc::channel example work?如何使一个简单的 futures::sync::mpsc::channel 示例工作?
【发布时间】:2016-12-24 20:58:31
【问题描述】:

我正在尝试编写一个简单的futures-rs mpsc 队列用法示例:

extern crate futures; // v0.1 (old)

use futures::{Sink, Stream};
use futures::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>(1000);

    let handle = thread::spawn(move || {
        tx.clone().send(1);
        tx.clone().send(2);
        tx.clone().send(3);
    });

    let mut rx = rx.map(|x| {
        println!("stream: {}", x);
        x * x
    });

    handle.join().unwrap();

    rx.poll().unwrap();
}

但它不会向控制台输出任何内容(我希望它打印stream: 1stream: 2stream: 3)。我还尝试用rx.wait() 替换rx.poll().unwrap(),但它仍然没有输出。而且我在 futures-rs 文档中没有找到任何使用示例。我做错了什么?

【问题讨论】:

    标签: rust message-queue future


    【解决方案1】:

    强烈建议阅读编译器告诉您的警告和错误消息。这是带有编译器的静态类型语言的一大好处:

    warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default
      --> src/main.rs:11:9
       |
    11 |         tx.clone().send(1);
       |         ^^^^^^^^^^^^^^^^^^^
    
    warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default
      --> src/main.rs:12:9
       |
    12 |         tx.clone().send(2);
       |         ^^^^^^^^^^^^^^^^^^^
    
    warning: unused result which must be used: futures do nothing unless polled, #[warn(unused_must_use)] on by default
      --> src/main.rs:13:9
       |
    13 |         tx.clone().send(3);
       |         ^^^^^^^^^^^^^^^^^^^
    

    我不是期货专家,但这编译时没有警告并打印所有三个值:

    extern crate futures; // 0.1.23
    
    use futures::{sync::mpsc, Async, Future, Sink, Stream};
    use std::thread;
    
    fn main() {
        let (tx, rx) = mpsc::channel(1000);
    
        let handle = thread::spawn(move || {
            tx.send(1)
                .and_then(|tx| tx.send(2))
                .and_then(|tx| tx.send(3))
                .wait()
                .expect("Unable to send");
        });
    
        let mut rx = rx.map(|x| x * x);
    
        handle.join().unwrap();
    
        while let Ok(Async::Ready(Some(v))) = rx.poll() {
            println!("stream: {}", v);
        }
    }
    

    and_then 用于在前一个值之后发送每个后续值。 wait 用于阻塞生成的线程,直到所有内容都发送成功。 poll 方法用于从队列中获取值,直到它用完。有多种方法可能会失败,我将全部忽略,只专注于成功案例。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-03-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多