【问题标题】:How to wait for tokio tasks to finish?如何等待 tokio 任务完成?
【发布时间】:2021-11-29 11:59:08
【问题描述】:

我正在尝试使用Arc<Mutex<T>> 模式给HashMap 写信,作为受the Rust cookbook 启发的网站抓取练习的一部分。

第一部分使用tokio 运行时。我无法完成正在完成的任务并返回 HashMap,因为它只是挂起。

type Db = Arc<Mutex<HashMap<String, bool>>>;

pub async fn handle_async_tasks(db: Db) -> BoxResult<HashMap<String, bool>> {
    let links = NodeUrl::new("https://www.inverness-courier.co.uk/")
        .await
        .unwrap();

    let arc = db.clone();

    let mut handles = Vec::new();

    for link in links.links_with_paths {
        let x = arc.clone();
        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    //  for handle in handles {
    //     handle.await.expect("Task panicked!");
    //  } < I tried this as well>

    futures::future::join_all(handles).await;

    let readables = arc.lock().await;

    for (key, value) in readables.clone().into_iter() {
        println!("Checking db: k, v ==>{} / {}", key, value);
    }

    let clone_db = readables.clone();

    return Ok(clone_db);
}

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
        db.insert(url.to_string(), true);
    } else {
        db.insert(url.to_string(), false);
    }
}

async fn check_link(url: &Url) -> BoxResult<bool> {
    let res = reqwest::get(url.as_ref()).await?;
    Ok(res.status() != StatusCode::NOT_FOUND)
}

pub struct NodeUrl {
    domain: String,
    pub links_with_paths: Vec<Url>,
}

#[tokio::main]
async fn main() {
    let db: Db = Arc::new(Mutex::new(HashMap::new()));

    let db = futures::executor::block_on(task::handle_async_tasks(db));
}

我想将HashMap 返回到主线程被阻塞的main()。如何等待所有异步线程进程完成并返回HashMap

【问题讨论】:

  • 你能提供你的main()函数吗?否则没有人能告诉你如何使用那里的HashMap
  • 除了我的回答之外,我建议在没有任何 Rust 经验的情况下不要直接跳入 async 的东西。
  • @orlp 我用我正在使用的实际 url 和 main() 函数更新了我的问题。 @sebpuetz
  • @godhar NodeUrl 是什么?
  • @orlp 它返回一个带有相对路径的 URL 的 Vec。它工作正常。

标签: multithreading rust tokio


【解决方案1】:
let links = NodeUrl::new("https://www.some-site.com/.co.uk/").await.unwrap();

在我看来,这不是一个有效的 URL。

async fn process(db: Db, url: Url) {
    let mut db = db.lock().await;
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         db.insert(url.to_string(), true);
    } else {
         db.insert(url.to_string(), false);
    }
}

这是非常有问题的。在整个请求期间,您持有数据库的排他锁。这使您的应用程序有效地串行。 reqwest 中的默认超时为 30 秒。因此,如果服务器没有响应并且你有很多链接要通过程序可能看起来只是“挂起”。

您应该只获得尽可能短的数据库锁定 - 只是为了进行插入:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);

    if check_link(&url).await.is_ok() {
         let mut db = db.lock().await;
         db.insert(url.to_string(), true);
    } else {
         let mut db = db.lock().await;
         db.insert(url.to_string(), false);
    }
}

或者甚至更好,消除无用的 if:

async fn process(db: Db, url: Url) {
    println!("checking {}", url);
    let valid = check_link(&url).await.is_ok();
    db.lock().await.insert(url.to_string(), valid);
}

最后你没有显示你的main 函数,你调用handle_async_tasks 或运行其他东西的方式可能有问题。

【讨论】:

  • 我明白你关于在请求期间打开锁的观点。谢谢你。
【解决方案2】:

我的主要问题是如何处理MutexGuard - 我最终使用clone 并返回内部值。

没有必要在main 中使用futures::executor:因为我们在tokio 运行时中,调用.await 就足以同步访问最终值。

克隆一次Arc 就足够了;在将它传递到多线程上下文之前,我已经克隆了它两次。

感谢@orlp 指出与check_link 函数有关的错误逻辑。

pub async fn handle_async_tasks() -> BoxResult<HashMap<String, bool>> {
    let get_links = NodeUrl::new("https://www.invernesscourier.co.uk/")
        .await
        .unwrap();

    let db: Db = Arc::new(Mutex::new(HashMap::new()));
    let mut handles = Vec::new();

    for link in get_links.links_with_paths {
        let x = db.clone();

        handles.push(tokio::spawn(async move {
            process(x, link).await;
        }));
    }

    futures::future::join_all(handles).await;

    let guard = db.lock().await;
    let cloned = guard.clone();

    Ok(cloned)
}

#[tokio::main]
async fn main() {
    let db = task::handle_async_tasks().await.unwrap();
    for (key, value) in db.into_iter() {
        println!("Checking db: {} / {}", key, value);
    }
}

这绝不是最好的 Rust 代码,但我想分享一下我最终是如何解决问题的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多