【问题标题】:Executing a collection of futures sequentially顺序执行期货集合
【发布时间】:2018-06-13 09:14:42
【问题描述】:

我有一个期货集合,我想将它们组合成一个单一的未来,让它们按顺序执行。

我查看了futures_ordered 函数。它似乎是按顺序返回结果,但期货是同时执行的。

我尝试fold 期货,将它们与and_then 结合起来。但是,这对于类型系统来说很棘手。

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

这会产生以下错误:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

我可能正在接近这个错误,但我已经没有想法了。

【问题讨论】:

  • 作为评论而不是答案发布,因为我不知道如何解决它:因为您提供 ok(()) - 它返回 FutureResult - 作为初始值,fold 期望您从闭包的每次迭代中返回一个 FutureResult。换句话说,推断的类型太具体了。
  • 一个让它们按顺序执行的单一未来——你为什么要做这样的事情?由于您的未来不相互依赖,因此没有明显的理由引入强制序列化。

标签: rust future


【解决方案1】:

Stream 有一个函数buffered,它允许您限制同时轮询的期货数量。

如果你有一个期货集合,你可以创建一个流并像这样使用buffered

let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);

when_result_ready 现在将成为 Stream 实现,一次只轮询一个未来,并在每个未来完成后移动到下一个。

更新

根据 cmets 和分析,buffered 的开销很大,因此另一种解决方案是将每个 Future 转换为 Streamflatten 它们:

iter_ok(tasks).map(|f|f.into_stream()).flatten()

flatten 声明“每个单独的流在进入下一个流之前都会耗尽。”这意味着在前一个完成之前不会轮询Future。在我的本地分析中,这似乎比 buffered 方法快约 80%。


我上面的两个答案都会产生Stream 的结果,其中每个源Future 被依次轮询并返回结果。提问者实际要求的只是最后一个Future,而不是每个来源Future 的结果,如果是这种情况,Stefan 的答案可能更有用,并被证明具有更好的性能。

【讨论】:

  • 谢谢!我特别喜欢能够通过缓冲区大小指定并发级别的粒度。
  • 我认为buffered 的开销很高(每次轮询内部未来时,中间应该有一些Arc 对象传递给with_notify)。拳击可能比buffered(1)便宜。
  • @Stefan 我做了一个快速工作台,盒装变体比缓冲溶液快大约 30%,标准偏差低得多。尽管我不确定任何关键部分都没有被优化剥离。 play.rust-lang.org/…
  • 其实and_then(|f| f) (Stream::and_then) 应该和buffered(1)“一样”,只是要快很多。
【解决方案2】:

作为mentioned in the comments,你的类型太具体了。

您可以将fold 的实现设想为执行以下操作:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0); 
combined_task = combined_task.and_then(|_| task1); 
combined_task = combined_task.and_then(|_| task2); 

变量combined_task 需要在原地更新为相同类型的新值。因为我们从ok(()) 开始,所以这是每个步骤需要返回的类型。但是and_then的返回类型不同;这是AndThen。事实上,AndThen 是一个包含闭包和底层未来的泛型类型,所以每一步都会产生一个不同大小的不同类型:

  1. FutureResult&lt;()&gt;
  2. AndThen&lt;FutureResult&lt;()&gt;, closure0&gt;
  3. AndThen&lt;AndThen&lt;FutureResult&lt;()&gt;, closure0&gt;, closure1&gt;
  4. AndThen&lt;AndThen&lt;AndThen&lt;FutureResult&lt;()&gt;, closure0&gt;, closure1&gt;, closure2&gt;

相反,您可以通过在每一步生成盒装特征对象来创建统一类型:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0)); 
combined_task = Box::new(combined_task.and_then(|_| task1)); 
combined_task = Box::new(combined_task.and_then(|_| task2)); 
  1. Box&lt;Future&lt;Item = (), Error = ()&gt;&gt;
  2. Box&lt;Future&lt;Item = (), Error = ()&gt;&gt;
  3. Box&lt;Future&lt;Item = (), Error = ()&gt;&gt;
  4. Box&lt;Future&lt;Item = (), Error = ()&gt;&gt;

转换回fold 语法:

let combined_task: Box<Future<Item = (), Error = ()>> =
    tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
        Box::new(acc.and_then(|_| task))
    });

另见:

【讨论】:

  • 谢谢!我正在考虑这一点,但如果可能的话,我更愿意避免堆分配/运行时分派。虽然我不确定其他缓冲流方法是如何在内部实现的。
  • 嗨,@Shepmaster,你能解释一下为什么添加Box可以解决这个问题吗?我遇到了同样的问题,如果我指定函数返回impl Future&lt;Item=String, Error=Error&gt;,程序编译失败expect futures::FutureResult, found futures::AndThen,但是当我将返回类型更改为Box&lt;Future&lt;Item=String, Error=Error&gt;&gt;时,一切都可以编译了。
  • @Songday 我在另一个答案中有更长的解释(现在从这个答案链接)。你能检查一下,看看它是否充分解释了它?如果没有,请告诉我缺少什么,我会尝试更新。
  • 谢谢@Shepmaster,我读了好几遍你的帖子,但不能完全理解。 Box,我的理解是,所有没有大小的东西都需要 Box 包装器。在我的例子中,函数返回 impl Future 未调整大小的特征,所以 Rust 拒绝编译,对吧?我仍然觉得我误会了什么......
  • @Songday 不完全; unsized 类型在这里并没有真正发挥作用。问题是每个步骤都有不同的类型(可以有不同的大小)。 impl Future 的大小不是。编译器知道具体的类型和大小,尽管程序员不知道。
【解决方案3】:

结合iter_okStream::for_each

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);

iter_ok 生成传递项目的流,并且从不抛出错误(这就是您有时需要修复错误类型的原因)。传递给 for_each 的闭包然后返回一个 Future 为每个项目运行 - 这里只是传入的项目。

for_each 然后驱动每个返回的未来完成,然后再移动到下一个,就像你想要的那样。它也会在遇到第一个错误时中止,并要求内部期货在成功时返回 ()

for_each 本身返回一个 Future,它要么失败(如上所述),要么在完成时返回 ()

test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)

【讨论】:

  • 这应该被接受而不是我的回答,我完全错过了提问者实际上想要一个 Future 最后。我根据想要 Stream 的 OP 写了我的答案。
  • 接受了@Stefan 对我方案中最佳解决方案的回答。还要非常感谢 Lukazoid 和 Shepmaster - 我学到了很多关于期货箱的知识。
【解决方案4】:

当我需要这样的东西时(主要是因为我正在调试一个问题),我最终写了一个 seq 组合器来组成 loop_fn like so

fn seq<I>(
    i: I,
) -> impl Future<Item = Vec<<I::Item as IntoFuture>::Item>, Error = <I::Item as IntoFuture>::Error>
where
    I: IntoIterator,
    I::Item: IntoFuture,
{
    let iter = i.into_iter();
    loop_fn((vec![], iter), |(mut output, mut iter)| {
        let fut = if let Some(next) = iter.next() {
            Either::A(next.into_future().map(|v| Some(v)))
        } else {
            Either::B(future::ok(None))
        };

        fut.and_then(move |val| {
            if let Some(val) = val {
                output.push(val);
                Ok(Loop::Continue((output, iter)))
            } else {
                Ok(Loop::Break(output))
            }
        })
    })
}

【讨论】:

    【解决方案5】:

    就我而言(稳定的async/await),这段代码很有帮助:

    use futures::{stream, StreamExt};
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
        let data = vec![1,2,3];
    
        stream::iter(data).for_each(|id| async move {
            let request = async { id }; // async io request
            let res = request.await;
            println!("res: {:?}", res);
            ()
        }).await;
        
        Ok(())
    }
    

    https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=ad5feaf0cbb3597730c22df2eaf4a606

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-04
      • 2014-03-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多