【问题标题】:Cycle detected in recursive async function在递归异步函数中检测到循环
【发布时间】:2021-08-16 14:58:03
【问题描述】:

我正在尝试在目录中递归查找带有扩展名的文件,这是我当前的实现:

use futures::future::BoxFuture;
use futures::Stream;
use std::io::ErrorKind;
use std::pin::Pin;
use std::result;
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> + Send + Sync + 'static>>;

async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
    async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
        let mut dir = read_dir(path).await?;
        let mut files: Vec<String> = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            if let Some(child_path) = child.path().to_str() {
                if child.metadata().await?.is_dir() {
                    tokio::spawn(async {
                        one_level(child_path.to_string(), tx.clone(), ext.clone()).await;
                    });
                } else {
                    if child_path.ends_with(&ext.clone()) {
                        files.push(child_path.to_string())
                    }
                }
            } else {
                tx.send(Err(std::io::Error::new(
                    ErrorKind::Other,
                    "Invalid path".to_string(),
                )));
            }
        }

        for file in files {
            tx.send(Ok(file));
        }
        Ok(())
    }

    let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
    tokio::spawn(async {
        one_level(root_path, tx, ext).await;
    });
    Ok(Box::pin(ReceiverStream::new(rx)))
}

我不太明白为什么编译器会抱怨:

14 |     async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
   |                                                                                  ^^^^^^^^^^
   |
note: ...which requires borrow-checking `list_all::{closure#0}::one_level`...
  --> src/main.rs:14:5
.....
.....
   = note: ...which requires evaluating trait selection obligation `for<'r, 's, 't0> {std::future::ResumeTy, &'r str, std::string::String, &'s tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, &'t0 std::string::String, impl futures::Future, ()}: std::marker::Send`...
   = note: ...which again requires computing type of `list_all::{closure#0}::one_level::{opaque#0}`, completing the cycle
   = note: cycle used when evaluating trait selection obligation `{std::future::ResumeTy, std::string::String, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, impl futures::Future, ()}: std::marker::Send`

是否可以将这样的递归函数定义为异步?我可以通过tokio::spwan并行列出目录的过程并加快速度吗?

【问题讨论】:

标签: recursion rust async-await


【解决方案1】:

rust 异步函数被编译为状态机,因此让异步函数调用自身将需要状态机将自身嵌入到自己的定义中,这将是无限递归。

这更好地解释here。已链接文档中解释的工作方法是通过BoxBoxFuture 类型)和非异步函数引入间接:

use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use std::io::ErrorKind;
use std::pin::Pin;
use std::result;
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> + Send + Sync + 'static>>;

async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
    let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
    tokio::spawn(async {
        recursive(root_path, tx, ext).await.unwrap();
    });
    Ok(Box::pin(ReceiverStream::new(rx)))
}

fn recursive(
    path: String,
    tx: Sender<Result<String>>,
    ext: String,
) -> BoxFuture<'static, Result<()>> {
    async move {
        let mut dir = read_dir(path).await?;
        let mut files: Vec<String> = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            match child.path().to_str() {
                Some(child_path) => {
                    let metadata = child.metadata().await?;

                    if metadata.is_dir() {
                        let cp = child_path.to_owned();
                        let tx = tx.clone();
                        let ext = ext.clone();

                        tokio::spawn(async {
                            recursive(cp, tx, ext).await.unwrap();
                        });
                    } else {
                        if child_path.ends_with(&ext) {
                            files.push(child_path.to_owned())
                        }
                    }
                }
                None => {
                    tx.send(Err(std::io::Error::new(
                        ErrorKind::Other,
                        "Invalid path".to_string(),
                    )))
                    .await
                    .unwrap();
                }
            }
        }

        for file in files {
            tx.send(Ok(file)).await.unwrap();
        }
        Ok(())
    }
    .boxed()
}

PS:我还修复了您的代码的一些其他问题,例如在 tx.send() 调用中缺少 await - 请记住 - 期货仅在被轮询时才执行工作!!!

【讨论】:

  • 感谢您的回答!你能解释一下为什么unwraptokio::spawntx.send(Ok).await 中的结果吗?他们会恐慌整个程序还是只是一个运行它的线程/执行器?是否也可以捕获和传播错误并发送到频道?
  • 解开它们会导致恐慌,从而在出现错误时使应用程序崩溃。您必须决定如何/是否处理错误情况。通常最好尽早失败/崩溃,而不是使用不正确的数据
  • 处理SendError的惯用方式是什么?如果我们正在编写一个循环用户输入并进行各种处理的应用程序,我们应该传播它吗?或者我们应该恐慌,因为这意味着我们写的代码有问题?
  • Receiver被删除后返回错误,因此无法接收任何消息。如果在您的应用程序中不可能发生,那么展开它就可以了。但如果有可能发生,处理错误的正确方法取决于不再有接收器时应该发生的情况 - 没有通用解决方案 - 这取决于应用程序
猜你喜欢
  • 2015-03-14
  • 1970-01-01
  • 2014-10-01
  • 1970-01-01
  • 1970-01-01
  • 2020-03-07
  • 2021-12-12
  • 2015-06-09
  • 2019-01-17
相关资源
最近更新 更多