【问题标题】:How can I run a set of functions concurrently on a recurring interval without running the same function at the same time using Tokio?如何在循环间隔内同时运行一组函数,而不使用 Tokio 同时运行相同的函数?
【发布时间】:2019-05-23 10:16:00
【问题描述】:

我的目标是同时运行 N 个函数,但在所有函数都完成之前不想产生更多。这是what I have so far

extern crate tokio;
extern crate futures;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("Hello from task {}", i);
                    // mock delay (something blocking)
                    // thread::sleep(time::Duration::from_secs(3));
                    Command::new("sleep").arg("3").output().expect("failed to execute process");

                    Ok(())
                }));
            }
            Ok(())
        })
    .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

我每秒生成 5 个函数,但我现在想等到所有函数都完成后再生成更多函数。

根据我的理解(我可能理解错了),我将在另一个未来返回Future

task (Interval ----------------------+ (outer future)
    for i in 0..5 {                  |
        tokio::spawn(  ----+         | 
            // my function | (inner) |
            Ok(())         |         |
        )              ----+         |
    }                                |
    Ok(()) --------------------------+

我一直在努力等待内心的未来结束。

【问题讨论】:

    标签: rust rust-tokio


    【解决方案1】:

    您可以通过加入您的 worker futures 来实现这一点,这样它们都并行运行,但必须一起完成。然后,出于相同的原因,您可以延迟 1 秒加入。将其包装成一个循环以永远运行它(或 5 次迭代,用于演示)。

    东京 1.3

    use futures::{future, future::BoxFuture, stream, FutureExt, StreamExt}; // 0.3.13
    use std::time::{Duration, Instant};
    use tokio::time; // 1.3.0
    
    #[tokio::main]
    async fn main() {
        let now = Instant::now();
        let forever = stream::unfold((), |()| async {
            eprintln!("Loop starting at {:?}", Instant::now());
    
            // Resolves when all pages are done
            let batch_of_pages = future::join_all(all_pages());
    
            // Resolves when both all pages and a delay of 1 second is done
            future::join(batch_of_pages, time::sleep(Duration::from_secs(1))).await;
            
            Some(((), ()))
        });
    
        forever.take(5).for_each(|_| async {}).await;
        eprintln!("Took {:?}", now.elapsed());
    }
    
    fn all_pages() -> Vec<BoxFuture<'static, ()>> {
        vec![page("a", 100).boxed(), page("b", 200).boxed()]
    }
    
    async fn page(name: &'static str, time_ms: u64) {
        eprintln!("page {} starting", name);
        time::sleep(Duration::from_millis(time_ms)).await;
        eprintln!("page {} done", name);
    }
    
    Loop starting at Instant { t: 1022680437923626 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { t: 1022681444390534 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { t: 1022682453240399 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { t: 1022683469924126 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { t: 1022684493522592 }
    page a starting
    page b starting
    page a done
    page b done
    Took 5.057315596s
    

    东京 0.1

    use futures::future::{self, Loop}; // 0.1.26
    use std::time::{Duration, Instant};
    use tokio::{prelude::*, timer::Delay};  // 0.1.18
    
    fn main() {
        let repeat_count = Some(5);
    
        let forever = future::loop_fn(repeat_count, |repeat_count| {
            eprintln!("Loop starting at {:?}", Instant::now());
    
            // Resolves when all pages are done
            let batch_of_pages = future::join_all(all_pages());
    
            // Resolves when both all pages and a delay of 1 second is done
            let wait = Future::join(batch_of_pages, ez_delay_ms(1000));
    
            // Run all this again
            wait.map(move |_| {
                if let Some(0) = repeat_count {
                    Loop::Break(())
                } else {
                    Loop::Continue(repeat_count.map(|c| c - 1))
                }
            })
        });
    
        tokio::run(forever.map_err(drop));
    }
    
    fn all_pages() -> Vec<Box<dyn Future<Item = (), Error = ()> + Send + 'static>> {
        vec![Box::new(page("a", 100)), Box::new(page("b", 200))]
    }
    
    fn page(name: &'static str, time_ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
        future::ok(())
            .inspect(move |_| eprintln!("page {} starting", name))
            .and_then(move |_| ez_delay_ms(time_ms))
            .inspect(move |_| eprintln!("page {} done", name))
    }
    
    fn ez_delay_ms(ms: u64) -> impl Future<Item = (), Error = ()> + Send + 'static {
        Delay::new(Instant::now() + Duration::from_millis(ms)).map_err(drop)
    }
    
    Loop starting at Instant { tv_sec: 4031391, tv_nsec: 806352322 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { tv_sec: 4031392, tv_nsec: 807792559 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { tv_sec: 4031393, tv_nsec: 809117958 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { tv_sec: 4031394, tv_nsec: 813142458 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { tv_sec: 4031395, tv_nsec: 814407116 }
    page a starting
    page b starting
    page a done
    page b done
    Loop starting at Instant { tv_sec: 4031396, tv_nsec: 815342642 }
    page a starting
    page b starting
    page a done
    page b done
    

    另见:

    【讨论】:

    • 谢谢,但我仍然试图了解使用 tokio::spawn() 您的回答帮助我了解如何“延迟”而不是使用 thread::sleep 。但是在我想调用系统命令 sleep N 的情况下,有没有办法“分组并等待”多个 tokio::spawn() 的所有返回,一旦完成,就继续进行另一批或等到定义的间隔时间通过然后再次调用所有函数?
    • 例如如何使用 Command::new("sleep").arg("3").output().expect("failed to execute process"); 之类的东西进行模拟
    • @nbari 我已经添加了关于如何运行通用阻塞代码(tl;dr:使用线程池)以及Command(tl;dr:使用 tokio-process)的现有问答的链接.
    • 谢谢,但是我仍然不太清楚如何使用 Tokio 以定义的时间间隔(调度程序)永远运行某些东西并同时生成 X 阻塞函数并等待它们完成,然后再次调用它们,在最后,我只想在 Rust 中做类似play.golang.org/p/ZLw6ESioqfu 的事情,我知道比较语言是不公平的,但它有助于明确我想要实现的目标。
    【解决方案2】:

    根据我的理解(我可能理解错了),我是 在另一个未来返回Future

    您没有错,但在您提供的代码中,唯一返回的未来是 Ok(()),它实现了 IntoFuturetokio::spawn 只是将新任务生成到 Tokio 的 DefaultExecutor 中。

    如果我从您的问题中理解,您想在 上一个 完成后生成 下一批,但如果上一个在 1 秒之前完成,您想要在产生下一批之前完成那 1 秒。

    实现自己的未来并自己处理民意调查将是更好的解决方案,但这可以大致完成:

    • 通过join_all收集批处理任务。这是一个等待收集的期货完成的新未来。
    • 等待 1 秒 后,您可以使用原子状态。如果它被锁定为刻度,它会等到状态释放。

    这里是代码(Playground):

    extern crate futures;
    extern crate tokio;
    
    use futures::future::lazy;
    use std::time::{self, Duration, Instant};
    
    use tokio::prelude::*;
    use tokio::timer::{Delay, Interval};
    
    use futures::future::join_all;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    
    fn main() {
        let locker = Arc::new(AtomicBool::new(false));
    
        let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
            .map_err(|e| panic!("interval errored; err={:?}", e))
            .for_each(move |interval| {
                let is_locked = locker.load(Ordering::SeqCst);
                println!("Interval: {:?} --- {:?}", interval, is_locked);
    
                if !is_locked {
                    locker.store(true, Ordering::SeqCst);
                    println!("locked");
    
                    let futures: Vec<_> = (0..5)
                        .map(|i| {
                            lazy(move || {
                                println!("Running Task-{}", i);
                                // mock delay
                                Delay::new(Instant::now() + Duration::from_millis(100 - i))
                                    .then(|_| Ok(()))
                            })
                            .and_then(move |_| {
                                println!("Task-{} is done", i);
                                Ok(())
                            })
                        })
                        .collect();
    
                    let unlocker = locker.clone();
                    tokio::spawn(join_all(futures).and_then(move |_| {
                        unlocker.store(false, Ordering::SeqCst);
                        println!("unlocked");
    
                        Ok(())
                    }));
                }
    
                Ok(())
            });
    
        tokio::run(task.then(|_| Ok(())));
    }
    

    输出:

    Interval: Instant { tv_sec: 4036783, tv_nsec: 211837425 } --- false
    locked
    Running Task-0
    Running Task-1
    Running Task-2
    Running Task-3
    Running Task-4
    Task-4 is done
    Task-3 is done
    Task-2 is done
    Task-1 is done
    Task-0 is done
    unlocked
    Interval: Instant { tv_sec: 4036784, tv_nsec: 211837425 } --- false
    locked
    Running Task-0
    Running Task-1
    Running Task-2
    Running Task-3
    Running Task-4
    Task-3 is done
    Task-4 is done
    Task-0 is done
    Task-1 is done
    Task-2 is done
    unlocked
    

    警告! :请查看Shepmaster's comment

    即使是为了演示,you should not use thread:sleep 在期货中。 还有更好的选择

    【讨论】:

    • 即使是演示,你也应该not use thread:sleep in futures。还有更好的选择
    • @Shepmaster 它在问题中,我直接复制了它。它被标记为模拟延迟,所以我没有在答案中提到它。
    • 原始问题还有其他问题,您没有直接复制(而是 Stack Overflow 的 point)。意识到大多数人会复制粘贴您的代码,而几乎没有考虑到答案。最好在各地鼓励良好做法。
    • @ÖmerErden 谢谢,但是如何同时生成所有函数,从你的例子中它们一个接一个地运行,所以如果每个函数延迟 5 秒,下一批将运行 N functions * 5 seconds整个过程只有 5 秒。
    • @nbari 是的,由于使用了thread::sleep,它没有并行执行,如果您阅读post in comments,您可以看到原因。我用适当的模拟延迟和延迟 Duration::from_millis(100 - i) 更新了代码,以查看以任意顺序完成的任务。
    猜你喜欢
    • 2019-10-08
    • 2020-08-08
    • 1970-01-01
    • 2020-11-09
    • 2021-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多