【问题标题】:Semaphore-implemented Producer-Consumer oriented thread pool信号量实现的面向生产者-消费者的线程池
【发布时间】:2016-07-27 06:56:04
【问题描述】:

我目前正在从事一项教育任务,其中我必须实现一个仅信号量的线程安全线程池。

我在作业期间不得使用:Synchronizewaitnotifysleep 或任何线程安全的 API。

首先不要过多地研究我拥有的代码:

  • 实现了线程安全队列(没有两个线程可以同时排队\出队)(我已经用ConcurrentLinkedQueue 测试过这个问题,问题仍然存在)

设计本身:

共享:

  • Tasks 信号量 = 0

  • Available 信号量 = 0

  • Tasks_Queue队列

  • Available_Queue队列

工作线程:

  • Blocked 信号量 = 0

一般信息:

  • 只有经理(单线程)可以出队Tasks_QueueAvailable_Queue

  • 只有App-Main(单线程)可以入队任务是Tasks_Queue

  • 每个工作线程都可以将自己排入Available_Queue

所以我们混合了一个生产者、一个管理者和几个消费者。

  • 当应用首次启动时,每个工作线程都会启动并立即将自己排入Available_Queue 中,释放Available 信号量并阻止获取它的个人Blocked 信号量。
  • 每当 App-Main 排队一个新任务时,它就会发布 Task Semaphore
  • 每当 Manager 希望执行新任务时,它必须首先获取 TasksAvailable 信号量。

我的问题:

在应用程序运行期间,函数 dequeue_worker() 返回一个 null 工作线程,即使在已知没有可用工作线程时放置一个信号量来保护对队列的访问。

如果它绘制一个空线程,我已经通过递归调用dequeue_worker()“解决”了这个问题,但是这样做是假设获取的信号量许可证永远丢失。然而,当我将工人数量限制为 1 时,工人不会永远被阻止。

1) 我的原始设计的断点是什么?

2)为什么我的“解决方案”没有进一步破坏设计?!

代码sn-ps:

// only gets called by Worker threads: enqueue_worker(this);
    private void enqueue_worker(Worker worker) {
       available_queue.add(worker);
       available.release();
    }

// only gets called by App-Main (a single thread)
    public void enqueue_task(Query query) {
        tasks_queue.add(query);
        tasks.release();
    }

// only gets called by Manager(a single Thread) 
    private Worker dequeue_worker() {
        Worker worker = null;
        try {
            available.acquire();
            worker = available_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } // **** the solution: ****
        if (worker==null) worker = dequeue_worker(); // TODO: find out why
        return worker;
    }

// only gets called by Manager(a single Thread) 
    private Query dequeue_task() {
        Query query = null;
        try {
            tasks.acquire();
            query = tasks_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } 
        return query;
    }

// gets called by Manager (a single thread)
    private void execute() { // check if task is available and executes it
        Worker worker = dequeue_worker(); // available.down()
        Query query = dequeue_task(); //task.down()
        worker.setData(query);
        worker.blocked.release();
    }

最后是 Worker 的 Run() 方法:

while (true) { // main infinite loop

                enqueue_worker(this);
                acquire(); // blocked.acquire();
                <C.S>
                available.release();
            }

【问题讨论】:

    标签: java multithreading threadpool semaphore producer-consumer


    【解决方案1】:

    您调用了两次available.release(),一次在enqueue_worker,第二次在主循环中。

    【讨论】:

    • 它的线程安全没有阻塞。我的阻塞机制是通过AvailableTasks 信号量实现的
    • 你的递归迭代做同样的事情,但效率很低
    • 我想了解available.acquire() 调用是如何允许空对象的.poll() 的。
    • 你调用了两次available.release(),一次在enqueue_worker,第二次在主循环中
    • 我到底是怎么错过的?!太感谢了!请编辑您的答案以反映此问题,我会选择作为首选答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多