【发布时间】:2017-01-06 12:03:10
【问题描述】:
假设我有一个大队列,大概有 10,000 个对象。我想用 5 个工作线程创建一个线程池,每个线程从队列中删除一个项目并对其进行处理,直到队列为空。
我担心的是,使用我在不同地方看到的设置,我最终会立即创建 10,000 个工作,但通过 5 个工人执行它们。我觉得这不是真正可扩展的 - 队列已经有 10,000 个项目,现在我在堆栈上还有 10,000 个作业(即使它们没有被主动执行,这似乎是一个内存问题)。
这似乎是这个答案所暗示的:https://stackoverflow.com/a/9916299/774359 - 我担心的是“// now submit our jobs”部分。我有效地将队列转储到作业中是否存在问题?
这是我目前所拥有的一个简短示例:
在 Main() 中:
ExecutorService executor = Executors.newFixedThreadPool(5);
while(!hugeQueue.isEmpty()) {
String work = hugeQueue.remove();
System.out.println("Creating job for " + work);
Runnable worker = new Worker(work);
executor.execute(worker);
}
在 Worker 类中:
public Worker(String itemFromQueue) { this.job = itemFromQueue; }
@Override
public void run() {
System.out.println("Working on " + this.itemFromQueue);
//Do actual work
}
当hugeQueue 包含 10,000 个数字时,我会看到所有 10,000 条“正在创建工作”消息,然后是所有 10,000 条“正在处理”消息。我认为如果一次只创建 5 个工作,然后继续工作会更好——当一个线程打开时,它会创建另一个工作,然后工作。这样一来,堆栈上永远不会有 10,000 个工作。我将如何做到这一点?我是否正确地考虑了这种架构?
根据答案进行了编辑以包含更新的信息:
@seneque 的代码没有立即编译,所以我做了一些小改动 - 不幸的是,这只是工人的创建,而不是实际的工作。
在 Main() 中:
int numOfThreads = 5;
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>();
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); }
ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
LongRunningWorker longRunningWorker = new LongRunningWorker();
for( int i = 0; i < numOfThreads ; i++ ) {
System.out.println("Created worker #" + i);
executor.submit(longRunningWorker);
}
System.out.println("Done");
在 LongRunningWorker 中:
public class LongRunningWorker implements Runnable {
BlockingQueue<Integer> workQueue;
void spiderExmaple(BlockingQueue<Integer> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
try {
while(workQueue.poll(3, TimeUnit.SECONDS) != null) {
Integer work = workQueue.remove();
System.out.println("Working on " + work);
new Worker(work).run();
}
} catch (InterruptedException e) { e.printStackTrace(); }
}
}
在工人中:
public class Worker implements Runnable{
Integer work;
Worker(Integer x) { this.work = x; }
@Override
public void run() {
System.out.println("Finished work on " + this.work);
}
}
【问题讨论】:
-
一个 ThreadPoolExecutorService(当你调用 Executors.newFixedThreadPool(5) 时创建的)有一个内部队列。所以在这里,你从一个队列中取出另一个队列,将由 5 个线程读取
-
@seneque 对 - 这意味着,对于我自己的 10,000 个项目的队列,我实际上会创建第二个相同大小的队列,对吗?对象不同,但我的问题是考虑到双重内存要求,这是否是一个可行的解决方案
-
取而代之的是,如果hugeQueue是一个阻塞队列,你可以让你的5个线程引用你的队列并从队列中轮询。
-
@seneque 你介意为我写一个例子吗?我很乐意接受您的回答 - 这听起来像是我在问题末尾所指的内容,但我不确定如何完成。
-
否则在创建threadpoolExecutorService时,可以定义一个大小有限的队列,以及像ThreadPoolExecutor.CallerRunsPolicy这样的拒绝策略
标签: java multithreading queue