【问题标题】:How do I iterate over a Vec of functions returning Futures in Rust?如何在 Rust 中迭代返回 Futures 的函数 Vec?
【发布时间】:2018-11-23 19:23:04
【问题描述】:

是否可以循环 Vec,调用一个在每个函数上返回 Future 的方法,并构建一个 Futures 链,以供消费者(最终)评估?是否执行后面的Futures 将取决于前面Futures 在Vec 中的结果。

澄清一下:

我正在开发一个可以从任意上游源集合中获取数据的应用程序。

请求数据将依次检查每个来源。如果第一个来源有错误 (Err),或者没有可用的数据 (None),那么将尝试第二个来源,依此类推。

每个来源都应该只尝试一次,并且在之前的所有来源都返回结果之前,不应尝试任何来源。错误会被记录下来,否则会被忽略,将查询传递给下一个上游数据源。

我有一些用于获取元数据的工作代码:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

除了我对所有BoxRc/RefCell 的废话感到不满之外,我真正关心的是executor::block_on() 电话。它阻塞,等待每个Future 返回结果,然后继续下一个。

鉴于可以调用fn_returning_future().or_else(|_| other_fn())等,是否可以建立这样的动态链?还是要求在移动到下一个迭代器之前对迭代器中的每个 Future 进行全面评估?

【问题讨论】:

  • 您可能想改用Stream。引用:“如果 Future 是 Result 的异步版本,那么 Stream 是 Iterator 的异步版本。”你想迭代:)
  • 请查看如何创建minimal reproducible example,然后查看edit 您的问题以包含它。我们无法分辨代码中存在哪些 crate、类型、特征、字段等。理想情况下,在Rust Playground 上生成可以重现您的错误的内容。我相信您可以删除所有特定于您的应用程序的代码,并拥有“循环Vec,调用一个在每个上返回Future的方法,并构建Futures链,以进行评估(最终)由消费者?”。

标签: iterator rust future


【解决方案1】:

您可以使用stream::unfold 将单个值转换为流。在这种情况下,我们可以使用 IntoIter 迭代器作为单个值。

use futures::{executor, stream, Stream, TryStreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<i32> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    Ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<i32>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    executor::block_on(async {
        s.try_for_each(|v| async move {
            println!("-> {}", v);
            Ok(())
        })
        .await
        .expect("An error occurred");
    });
}
Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }
-> 100
Resolving 2 at Instant { tv_sec: 6223329, tv_nsec: 310839993 }
-> 200
Resolving 3 at Instant { tv_sec: 6223330, tv_nsec: 311005834 }
-> 300

要忽略ErrNone,您必须将Error 穿梭到Item,使Item 输入Result&lt;Option&lt;T&gt;, Error&gt;

use futures::{executor, stream, Stream, StreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<Option<i32>>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    executor::block_on(async {
        let s = requests_in_sequence(vec![1, 2, 3]);

        let s = s.filter_map(|v| async move { v.ok() });
        let s = s.filter_map(|v| async move { v });
        let mut s = s.boxed_local();

        match s.next().await {
            Some(v) => println!("First success: {}", v),
            None => println!("No successful requests"),
        }
    });
}
Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }
Resolving 2 at Instant { tv_sec: 6224230, tv_nsec: 727404752 }
Resolving 3 at Instant { tv_sec: 6224231, tv_nsec: 727593740 }
First success: 300

有没有可能建立这样的动态链

是的,通过利用 async 函数:

use futures::executor; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

async fn requests_in_sequence(vals: Vec<i32>) -> Result<i32> {
    let mut vals = vals.into_iter().peekable();

    while let Some(v) = vals.next() {
        match network_request(v).await {
            Ok(Some(v)) => return Ok(v),
            Err(e) if vals.peek().is_none() => return Err(e),
            Ok(None) | Err(_) => { /* Do nothing and try the next source */ }
        }
    }

    Err("Ran out of sources".into())
}

fn main() {
    executor::block_on(async {
        match requests_in_sequence(vec![1, 2, 3]).await {
            Ok(v) => println!("First success: {}", v),
            Err(e) => println!("No successful requests: {}", e),
        }
    });
}

另见:


在移动到下一个之前,是否需要对迭代器中的每个 Future 进行全面评估

这不是你自己的要求吗?强调我的:

请求数据会依次检查每个来源。如果第一个来源有错误 (Err),或者没有可用的数据 (None),那么将尝试第二个来源

【讨论】:

  • Re: 是否需要在移动到下一个之前完全评估迭代器中的每个 Future,是的,我意识到它必须解决,但我希望执行由调用者管理,不在我的函数内。
  • 如何将这些请求返回的值收集到向量中?
猜你喜欢
  • 2016-02-06
  • 2019-06-13
  • 1970-01-01
  • 2016-07-22
  • 1970-01-01
  • 2021-10-20
  • 2023-02-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多