【问题标题】:Rust, how to perform basic recursive async?Rust,如何执行基本的递归异步?
【发布时间】:2021-03-24 10:56:02
【问题描述】:

我只是在做一些快速的实验,试图学习 rust 语言,我已经做了一些成功的异步测试,这是我的起点:

use async_std::task;
use futures;
use std::time::SystemTime;

fn main() {
    let now = SystemTime::now();
    task::block_on(async {
        let mut fs = Vec::new();
        let sum = 100000000;
        let chunks: u64 = 5; //only valid for factors of sum
        let chunk_size: u64 = sum/chunks;
        for n in 1..=chunks {
            fs.push(task::spawn(async move {
                add_range((n - 1) * chunk_size + 1, n * chunk_size + 1)
            }));
        }
        let vals = futures::future::join_all(fs).await;
        // 5000000050000000 for this configuration of inputs
        println!("{}", vals.iter().sum::<u64>());
    });
    println!("{}ms", now.elapsed().unwrap().as_millis());
}

fn add_range(start: u64, end: u64) -> u64 {
    println!("{}, {}", start, end);
    let mut total: u64 = 0;
    for n in start..end {
        total += n;
    }
    return total;
}

通过更改chunks 的值,您可以更改task::spawn 的数量。现在,我希望 add_range 函数是递归的,而不是一组扁平的工人,并根据输入继续分叉工人,但是在编译器错误之后,我让自己很纠结:

use async_std::task;
use futures;
use std::future::Future;
use std::pin::Pin;

fn main() {
    let pin_box_u64 = task::block_on(add_range(0, 10, 10, 1, 1001));
    println!("{}", pin_box_u64/*how do i get u64 out of this*/)
}

// recursively calls itself in a branching tree structure
// forking off more worker threads
async fn add_range(
    depth: u64,
    chunk_split: u64,
    chunk_size: u64,
    start: u64,
    end: u64,
) -> Pin<Box<dyn Future<Output = u64>>> {
    println!("{}, {}, {}", depth, start, end);
    // if the range of start to end is more than the allowed
    // chunk_size then fork off more workers dividing
    // the work up further.
    if end - start > chunk_size {
        let mut fs = Vec::new();
        let next_chunk_size = (end - start) / chunk_split;
        for n in 0..chunk_split {
            let s = start + (next_chunk_size * n);
            let mut e = start + (next_chunk_size * (n + 1));
            if e > end {
                e = end;
            }
            // spawn more workers
            fs.push(task::spawn(add_range(depth + 1, chunk_split, chunk_size, s, e)));
        }
        return Box::pin(async move {
            // join workers back up and do joining sum. 
            return futures::future::join_all(fs).await.iter().map(/*how do i get u64s out of here*/).sum::<u64>();
        });
    } else {
        // else the work is less than the allowed chunk_size
        // so lets now do the actual sum for my chunk 
        let mut total: u64 = 0;
        for n in start..end {
            total += n;
        }
        return Box::pin(async move { total });
    }
}

我已经玩了一段时间了,但我觉得我只是因为编译器错误而越来越迷失。

【问题讨论】:

    标签: rust rust-async-std


    【解决方案1】:

    需要对返回的future进行box,否则编译器无法判断返回类型的大小。

    可以在此处找到其他上下文:https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html

    use std::pin::Pin;
    
    use async_std::task;
    use futures::Future;
    use futures::FutureExt;
    
    fn main() {
        let pin_box_u64 = task::block_on(add_range(0, 10, 10, 1, 1001));
        println!("{}", pin_box_u64)
    }
    
    // recursively calls itself in a branching tree structure
    // forking off more worker threads
    fn add_range(
        depth: u64,
        chunk_split: u64,
        chunk_size: u64,
        start: u64,
        end: u64,
    ) -> Pin<Box<dyn Future<Output = u64> + Send + 'static>> {
        println!("{}, {}, {}", depth, start, end);
        // if the range of start to end is more than the allowed
        // chunk_size then fork off more workers dividing
        // the work up further.
        if end - start > chunk_size {
            let mut fs = Vec::new();
            let next_chunk_size = (end - start) / chunk_split;
            for n in 0..chunk_split {
                let s = start + (next_chunk_size * n);
                let mut e = start + (next_chunk_size * (n + 1));
                if e > end {
                    e = end;
                }
                // spawn more workers
                fs.push(task::spawn(add_range(
                    depth + 1,
                    chunk_split,
                    chunk_size,
                    s,
                    e,
                )));
            }
            // join workers back up and do joining sum.
            return futures::future::join_all(fs)
                .map(|v| v.iter().sum::<u64>())
                .boxed();
        } else {
            // else the work is less than the allowed chunk_size
            // so lets now do the actual sum for my chunk
            let mut total: u64 = 0;
            for n in start..end {
                total += n;
            }
            return futures::future::ready(total).boxed();
        }
    }
    
    

    【讨论】:

      猜你喜欢
      • 2014-07-02
      • 1970-01-01
      • 2015-10-17
      • 1970-01-01
      • 2018-10-11
      • 2023-03-23
      • 2011-09-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多