【问题标题】:Why is `Future::poll` not called repeatedly after returning `NotReady`?为什么返回 NotReady 后没有重复调用 Future::poll?
【发布时间】:2020-02-03 00:24:06
【问题描述】:

考虑下面的代码

extern crate futures; // v0.1 (old)

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}

生成的线程设置一个标志,未来等待。 我们还使生成的线程休眠,因此来自.wait() 的初始.poll() 调用是在设置标志之前。这会导致.wait() 无限期地阻塞(看似)。如果我们取消注释另一个 thread::sleep_ms.wait() 返回,并打印出结果 (())。

我希望当前线程尝试通过多次调用poll 来解决未来问题,因为我们正在阻塞当前线程。然而,这并没有发生。

我尝试阅读some docs,似乎问题在于第一次从poll 获取NotReady 后,该线程是parked。但是,我不清楚为什么会这样,或者如何解决这个问题。

我错过了什么?

【问题讨论】:

    标签: multithreading rust future


    【解决方案1】:

    为什么需要停放一个等待的未来而不是反复轮询它?答案很明显,恕我直言。因为归根结底,它更快、更高效!

    要反复轮询未来(可能被称为“忙等待”),图书馆必须决定是经常还是很少这样做,而且两个答案都不令人满意。经常这样做会浪费 CPU 周期,很少这样做会导致代码反应缓慢。

    所以,是的,您需要在等待某事时将任务停放,然后在等待完成后将其取消停放。像这样:

    #![allow(deprecated)]
    
    extern crate futures;
    
    use std::sync::{Arc, Mutex};
    use futures::*;
    use futures::task::{park, Task};
    
    struct Status {
        ready: bool,
        task: Option<Task>,
    }
    
    #[allow(dead_code)]
    struct F(Arc<Mutex<Status>>);
    
    impl Future for F {
        type Item = ();
        type Error = ();
    
        fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
            println!("Check if flag is set");
            let mut status = self.0.lock().expect("!lock");
            if status.ready {
                Ok(Async::Ready(()))
            } else {
                status.task = Some(park());
                Ok(Async::NotReady)
            }
        }
    }
    
    #[test]
    fn test() {
        let flag = Arc::new(Mutex::new(Status {
                                           ready: false,
                                           task: None,
                                       }));
        let future = F(flag.clone());
        ::std::thread::spawn(move || {
            ::std::thread::sleep_ms(10);
            println!("set flag");
            let mut status = flag.lock().expect("!lock");
            status.ready = true;
            if let Some(ref task) = status.task {
                task.unpark()
            }
        });
        let result = future.wait();
        println!("result: {:?}", result);
    }
    

    请注意,Future::poll 在这里做了几件事:它正在检查外部条件并正在停止任务,因此可能会出现 race,例如:

    1. poll 检查变量并发现它是false
    2. 外部代码将变量设置为true
    3. 外部代码检查任务是否已停放,发现不是;
    4. poll 停止任务,但是砰!为时已晚,没有人会再把它拆开。

    为了避免任何竞争,我使用了Mutex 来同步这些交互。

    附:如果您只需要将线程结果包装到 Future 中,请考虑使用 oneshot 通道:它具有实现 Future 接口的 Receiver

    【讨论】:

    • 答案很明显,恕我直言——如果答案很明显,人们就不需要问这个问题了^_^
    • @Shepmaster 这是一个有效的嘲讽,但人们也会提出问题以确认某些事情或发泄他们的挫败感,等等。 = ) 通过说答案是显而易见的,我是在说这件事很简单,无需过度思考。此外,“小学,我亲爱的华生”:)
    • 好的!我了解停车计划背后的动机,但我没有意识到我必须手动处理 parking 的东西(尽管这很有意义 - futures 如何知道值何时准备好?)。谢谢!
    • 第一次看的时候真的不明白park()/unpark()是什么意思,现在函数改名为current()/notify()了。感谢@Shepmaster 再次回答我的问题。有些人可能不太了解这一点,您可以阅读stackoverflow.com/questions/58377995/… 了解更多信息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-09
    • 2019-10-08
    相关资源
    最近更新 更多