【发布时间】:2016-07-27 06:56:04
【问题描述】:
我目前正在从事一项教育任务,其中我必须实现一个仅信号量的线程安全线程池。
我在作业期间不得使用:Synchronizewaitnotifysleep 或任何线程安全的 API。
首先不要过多地研究我拥有的代码:
- 实现了线程安全队列(没有两个线程可以同时排队\出队)(我已经用
ConcurrentLinkedQueue测试过这个问题,问题仍然存在)
设计本身:
共享:
Tasks信号量 = 0Available信号量 = 0Tasks_Queue队列Available_Queue队列
工作线程:
-
Blocked信号量 = 0
一般信息:
只有经理(单线程)可以出队
Tasks_Queue和Available_Queue只有App-Main(单线程)可以入队任务是
Tasks_Queue每个工作线程都可以将自己排入
Available_Queue
所以我们混合了一个生产者、一个管理者和几个消费者。
- 当应用首次启动时,每个工作线程都会启动并立即将自己排入
Available_Queue中,释放Available信号量并阻止获取它的个人Blocked信号量。 - 每当 App-Main 排队一个新任务时,它就会发布
TaskSemaphore - 每当 Manager 希望执行新任务时,它必须首先获取
Tasks和Available信号量。
我的问题:
在应用程序运行期间,函数 dequeue_worker() 返回一个 null 工作线程,即使在已知没有可用工作线程时放置一个信号量来保护对队列的访问。
如果它绘制一个空线程,我已经通过递归调用dequeue_worker()“解决”了这个问题,但是这样做是假设获取的信号量许可证永远丢失。然而,当我将工人数量限制为 1 时,工人不会永远被阻止。
1) 我的原始设计的断点是什么?
2)为什么我的“解决方案”没有进一步破坏设计?!
代码sn-ps:
// only gets called by Worker threads: enqueue_worker(this);
private void enqueue_worker(Worker worker) {
available_queue.add(worker);
available.release();
}
// only gets called by App-Main (a single thread)
public void enqueue_task(Query query) {
tasks_queue.add(query);
tasks.release();
}
// only gets called by Manager(a single Thread)
private Worker dequeue_worker() {
Worker worker = null;
try {
available.acquire();
worker = available_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
} // **** the solution: ****
if (worker==null) worker = dequeue_worker(); // TODO: find out why
return worker;
}
// only gets called by Manager(a single Thread)
private Query dequeue_task() {
Query query = null;
try {
tasks.acquire();
query = tasks_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
}
return query;
}
// gets called by Manager (a single thread)
private void execute() { // check if task is available and executes it
Worker worker = dequeue_worker(); // available.down()
Query query = dequeue_task(); //task.down()
worker.setData(query);
worker.blocked.release();
}
最后是 Worker 的 Run() 方法:
while (true) { // main infinite loop
enqueue_worker(this);
acquire(); // blocked.acquire();
<C.S>
available.release();
}
【问题讨论】:
标签: java multithreading threadpool semaphore producer-consumer