【问题标题】:Application on OSX cannot spawn more than 2048 threadsOSX 上的应用程序不能产生超过 2048 个线程
【发布时间】:2024-04-12 08:20:02
【问题描述】:

我在 OSX 上有一个 Rust 应用程序启动了大量线程,如下面的代码所示,但是,在查看了我的 OSX 版本允许通过 sysctl kern.num_taskthreads 命令创建的最大线程数之后,我可以看到它是kern.num_taskthreads: 2048,这解释了为什么我无法启动超过 2048 个线程。

我该如何克服这个硬性限制?

let threads = 300000;
let requests = 1;

for _x in 0..threads {
    println!("{}", _x);
    let request_clone = request.clone();

    let handle = thread::spawn(move || {
        for _y in 0..requests {
            request_clone.lock().unwrap().push((request::Request::new(request::Request::create_request())));
        }
    });

    child_threads.push(handle);
}

【问题讨论】:

  • 关于为什么要创建这么多线程的任何详细信息?这似乎太过分了。
  • 您认为可以创建一个较小的线程“池”然后对其进行迭代吗?每个线程可以处理多个连接(数百个?)
  • 也许在 OS X 上你可以使用 Grand Central Dispatch 代替线程。 dispatch_async 到标准并发全局队列之一。
  • “这解释了为什么我不能启动超过 2048 个线程”-“我该如何克服这个硬限制”- 你自己回答了:你不能乙>。您应该改写您的问题以避免XY Problem - 您真的想知道如何创建比创建线程更多的并发 HTTP 请求。
  • 对于异步 HTTP 处理,我通常使用 cURL。这是一个质量很好且维护良好的项目。不过,我还没有达到需要从 Rust 使用它的阶段,所以我不能吹嘘一些 Rust 代码。现有的 Rust 驱动程序github.com/carllerche/curl-rust 仅适用于“简单”处理程序,因此不足以完成这项工作。

标签: linux multithreading macos rust


【解决方案1】:

在开始之前,我建议您阅读C10K problem。当你达到这个规模时,你需要记住的事情还有很多。

话虽如此,我建议查看mio...

Rust 的轻量级 IO 库,重点是在操作系统抽象上增加尽可能少的开销。

具体来说,mio 提供了一个事件循环,它允许您在不产生线程的情况下处理大量连接。不幸的是,我不知道目前支持 mio 的 HTTP 库。您可以创建一个并成为 Rust 社区的英雄!

【讨论】:

    【解决方案2】:

    不确定这会有多大帮助,但我试图创建一个小的线程池来创建连接,然后通过通道将它们发送到事件循环以供读取。

    我确信这段代码可能很糟糕,但无论如何,这里都是示例。就像你提到的那样,它使用 Hyper 库。

    extern crate hyper;
    
    use std::io::Read;
    use std::thread;
    use std::thread::{JoinHandle};
    use std::sync::{Arc, Mutex};
    use std::sync::mpsc::channel;
    
    use hyper::Client;
    use hyper::client::Response;
    use hyper::header::Connection;
    
    const TARGET: i32 = 100;
    const THREADS: i32 = 10;
    
    struct ResponseWithString {
        index: i32,
        response: Response,
        data: Vec<u8>,
        complete: bool
    }
    
    fn main() {
        // Create a client.
        let url: &'static str = "http://www.gooogle.com/";
    
        let mut threads = Vec::<JoinHandle<()>>::with_capacity((TARGET * 2) as usize);
        let conn_count = Arc::new(Mutex::new(0));
        let (tx, rx) = channel::<ResponseWithString>();
    
        for _ in 0..THREADS {
            // Move var references into thread context
            let conn_count = conn_count.clone();
            let tx = tx.clone();
    
            let t = thread::spawn(move || {
                loop {
                    let idx: i32;
                    {
                        // Lock, increment, and release
                        let mut count = conn_count.lock().unwrap();
                        *count += 1;
                        idx = *count;
                    }
                    if idx > TARGET {
                        break;
                    }
    
                    let mut client = Client::new();
    
                    // Creating an outgoing request.
                    println!("Creating connection {}...", idx);
                    let res = client.get(url)                       // Get URL...
                                    .header(Connection::close())    // Set headers...
                                    .send().unwrap();               // Fire!
    
                    println!("Pushing response {}...", idx);
                    tx.send(ResponseWithString {
                        index: idx,
                        response: res,
                        data: Vec::<u8>::with_capacity(1024),
                        complete: false
                    }).unwrap();
                }
            });
            threads.push(t);
        }
    
        let mut responses = Vec::<ResponseWithString>::with_capacity(TARGET as usize);
        let mut buf: [u8; 1024] = [0; 1024];
        let mut completed_count = 0;
        loop {
            if completed_count >= TARGET {
                break; // No more work!
            }
    
            match rx.try_recv() {
                Ok(r) => {
                    println!("Incoming response! {}", r.index);
                    responses.push(r)
                },
                _ => { }
            }
    
            for r in &mut responses {
                if r.complete {
                    continue;
                }
    
                // Read the Response.
                let res = &mut r.response;
                let data = &mut r.data;
                let idx = &r.index;
    
                match res.read(&mut buf) {
                    Ok(i) => {
                            if i == 0 {
                                println!("No more data! {}", idx);
                                r.complete = true;
                                completed_count += 1;
                            }
                            else {
                                println!("Got data! {} => {}", idx, i);
                                for x in 0..i {
                                    data.push(buf[x]);
                                }
                            }
                        }
                    Err(e) => {
                        panic!("Oh no! {} {}", idx, e);
                    }
                }
            }
        }
    }
    

    【讨论】:

    • 我肯定少了一块,但是这怎么会有超过#THREADS的并发连接数?
    • 有一个互斥体跟踪打开的连接数,在每个线程的循环内使用。 THREADS 是打开连接的线程数,而TARGET 是它试图打开的连接数。这是一个弱示例,但我想展示另一种处理连接的方法,而不是每个线程一个。
    • 啊,我明白了。所以每个线程负责打开新的连接,但其他的负责关闭它们,这意味着可以打开的连接数多于线程数!
    • 正确!它们被传递到一个循环中从它们读取的通道中。这也可以扩展到包括线程。
    • 您是否能够从“另一端”测量并确定这实际启用了多少并发连接?
    最近更新 更多