严重警告我以前从未使用过这个库,而且我对一些概念的低级知识有点……缺乏。大多数情况下,我正在阅读the tutorial。我很确定任何做过异步工作的人都会读到这篇文章并大笑,但这对其他人来说可能是一个有用的起点。警告购买者!
让我们从简单一点的开始,演示Stream 的工作原理。我们可以将Results 的迭代器转换成流:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
这向我们展示了一种消费流的方式。我们使用and_then 对每个有效负载执行某些操作(这里只是将其打印出来),然后使用for_each 将Stream 转换回Future。然后我们可以通过调用奇怪的forget method 来运行未来。
接下来是将 Redis 库绑定到混合中,只处理一条消息。由于get_message() 方法是阻塞的,我们需要在混合中引入一些线程。在这种类型的异步系统中执行大量工作并不是一个好主意,因为其他一切都会被阻塞。 For example:
除非另有安排,否则应确保此功能的实现很快完成。
在理想的世界中,redis crate 将构建在类似 futures 的库之上,并以原生方式公开所有这些。
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我的理解在这里变得更加模糊。在一个单独的线程中,我们阻塞消息并在收到消息时将其推送到通道中。我不明白为什么我们需要抓住线程的句柄。我希望foo.forget 会自己阻塞,一直等到流为空。
在与 Redis 服务器的 telnet 连接中,发送以下内容:
publish rust awesome
你会看到它有效。添加打印语句表明(对我而言)foo.forget 语句在线程产生之前运行。
多条消息比较棘手。 Sender 消耗自身以防止生成方远远领先于消耗方。这是通过从send 返回另一个未来来完成的!我们需要将它从那里运回,以便在循环的下一次迭代中重用它:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
我相信随着时间的推移,这种类型的互操作将会有更多的生态系统。例如,futures-cpupool crate 可以可能扩展以支持与此类似的用例。