【问题标题】:How to implement a stream of futures for a blocking call using futures.rs and Redis PubSub?如何使用 futures.rs 和 Redis PubSub 为阻塞调用实现期货流?
【发布时间】:2016-12-18 19:14:36
【问题描述】:

我正在尝试创建一个系统,我的应用程序可以通过该系统从 Redis PubSub 通道接收流数据并对其进行处理。我正在使用的 Redis driver 以及我见过的所有其他用于 Rust 的 Redis 驱动程序使用阻塞操作从通道获取数据,该通道仅在接收数据时返回一个值:

let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

我想在将来使用futures-rs 库来包装这个阻塞函数调用,这样我就可以在等待输入的同时在我的应用程序中执行其他任务。

我阅读了有关期货的 tutorial 并尝试创建一个 Stream,它会在 PubSub 收到数据时发出信号,但我不知道该怎么做。

如何为阻塞 pubsub.get_message() 函数创建 schedulepoll 函数?

【问题讨论】:

  • 在发布重大公告的当天使用图书馆;多么雄心勃勃! ^_^

标签: asynchronous redis rust future


【解决方案1】:

严重警告我以前从未使用过这个库,而且我对一些概念的低级知识有点……缺乏。大多数情况下,我正在阅读the tutorial。我很确定任何做过异步工作的人都会读到这篇文章并大笑,但这对其他人来说可能是一个有用的起点。警告购买者!


让我们从简单一点的开始,演示Stream 的工作原理。我们可以将Results 的迭代器转换成流:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

这向我们展示了一种消费流的方式。我们使用and_then 对每个有效负载执行某些操作(这里只是将其打印出来),然后使用for_eachStream 转换回Future。然后我们可以通过调用奇怪的forget method 来运行未来。


接下来是将 Redis 库绑定到混合中,只处理一条消息。由于get_message() 方法是阻塞的,我们需要在混合中引入一些线程。在这种类型的异步系统中执行大量工作并不是一个好主意,因为其他一切都会被阻塞。 For example:

除非另有安排,否则应确保此功能的实现很快完成

在理想的世界中,redis crate 将构建在类似 futures 的库之上,并以原生方式公开所有这些。

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

我的理解在这里变得更加模糊。在一个单独的线程中,我们阻塞消息并在收到消息时将其推送到通道中。我不明白为什么我们需要抓住线程的句柄。我希望foo.forget 会自己阻塞,一直等到流为空。

在与 Redis 服务器的 telnet 连接中,发送以下内容:

publish rust awesome

你会看到它有效。添加打印语句表明(对我而言)foo.forget 语句在线程产生之前运行。


多条消息比较棘手。 Sender 消耗自身以防止生成方远远领先于消耗方。这是通过从send 返回另一个未来来完成的!我们需要将它从那里运回,以便在循环的下一次迭代中重用它:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

我相信随着时间的推移,这种类型的互操作将会有更多的生态系统。例如,futures-cpupool crate 可以可能扩展以支持与此类似的用例。

【讨论】:

  • 感谢您的精彩回答!只有一个问题:加入redis_thread 是否会否定使结果读取过程非阻塞的全部努力?也许有什么我不明白的。
  • “我希望 foo.forget 会自己阻塞,等到流为空” 实际上,future 没有义务提供“阻塞直到准备好”的方法。 forget(),就其描述而言,是为了防止future被丢弃时自动取消,但与等待无关。例如,在 Scala 中,Future 上没有用于此目的的方法,而是有一个单独的 Await.ready/Await.result 方法对,它们等待未来在某个超时内准备好。
  • 据我了解,在future-rs 中可以使用Future::select 实现类似的事情,第二个future 在固定超时后完成。
猜你喜欢
  • 2013-04-03
  • 2011-12-13
  • 1970-01-01
  • 2016-06-02
  • 2013-11-16
  • 2013-01-15
  • 2019-05-31
  • 1970-01-01
  • 2018-06-18
相关资源
最近更新 更多