【问题标题】:Does Java provide an ExecutorService which allows a worker to execute on the same thread?Java 是否提供 ExecutorService 允许工作人员在同一个线程上执行?
【发布时间】:2013-02-09 07:27:16
【问题描述】:

我正在寻找ExecutorService 的实现,它将提供以下语义。每个线程都由一个“工作人员”占用,该工作人员根据输入执行某些任务。每个工作线程都保证只在一个线程中执行,因此,应该允许它在一个任务之间保持状态,而没有同步的开销,因为它会在一个线程中与自己同步。

假设我有 100 个输入和 10 个工人,我希望能够编写如下内容:

for (Input input: inputs) {
    // The following code would pass t to all 10 workers,
    // each bound to their own thread,
    // and wait for them to complete.
    executor.invokeAll(input);
}

请注意,每个 Worker 对任何给定的输入都会做不同的事情。输入不是可运行的代码块,它只是工人的一个参数。每个工人决定如何处理输入。不过,为了简单起见,worker 实现了一个接口,允许以多态方式调用它,接收输入。

我使用Map<Worker, WorkerExecutor> 破解了一些可行的方法,其中WorkerExecutor 是我对Executors.newSingleThreadPool 的薄包装,并且每个线程池中只有一个Worker 实例将运行。我更愿意找到知道自己在做什么的人写的东西:-)


我可以接受的潜在低效率

我意识到这种语义会导致效率低下,但是,我试图在开发时间方面获得最大的收益,并且重新设计 Worker 的每个实现以实现线程安全并非易事。我的意思是效率低下是执行可能/将看起来像这样(在此示例中模拟最多 2 个活动线程):

         | Task 1    | Task 2    | Task 3    | Task 4    |
Worker 1 | =@        | =@        | =@        | =@        |
Worker 2 | ==@       | ==@       | ==@       | ==@       |
Worker 3 |   ==@     |   ==@     |   ==@     |   ==@     |
Worker 4 |    =====@ |    =====@ |    =====@ |    =====@ |

问题是 Worker 3 完成后,没有任务要做,直到 Worker 4 完成后才能完成任何工作。这可以是任意长的 CPU 空闲时间。


这样的ExecutorService 存在吗?

【问题讨论】:

  • 你试过Executors.newSingleThreadExecutor() 吗?
  • @VishalK 这不是我想要的。我确实希望事情并行运行,可能跨越 50 个线程。我描述的那个瘦包装器使用newSingleThreadExecutor 将一个worker+task 提交到它的线程。问题是我必须写它,毫无疑问我弄错了:)
  • 是否可以为每个线程实例状态使用ThreadLocal 变量来解决该问题?
  • @Barend 这是一个有趣的建议。同样,这是我必须自己管理的事情,但这可能比我现在拥有的更有用。

标签: java multithreading concurrency threadpool executorservice


【解决方案1】:

听起来你真正想要的是actors。简而言之,actor 是一个在单个线程中运行的对象,并且有一个任务“邮箱”,它负责按顺序处理这些任务。 Akka 似乎是当前在 JVM 上提供参与者的领先库/框架。看看那边。

【讨论】:

    【解决方案2】:

    类似的东西:

    import java.util.LinkedHashSet;
    import java.util.Set;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    // you implement this for each of your non-parallelisable jobbies
    interface Worker<T> {
        public void process(T input);
    }
    
    // implementation detail
    class Clerk<T> {
        private final Executor executor = Executors.newSingleThreadExecutor();
        private final Worker<T> worker;
    
        public Clerk(Worker<T> worker) {
            this.worker = worker;
        }
    
        public void process(final T input) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    worker.process(input);
                }
            });
        }
    }
    
    // make one of these, and give it all your workers, then give it input
    class Workshop<T> {
        private final Set<Clerk<T>> clerks = new LinkedHashSet<Clerk<T>>();
    
        public void addWorker(Worker<T> worker) {
            // mutable; you love it
            clerks.add(new Clerk<T>(worker));
        }
    
        public void process(T input) {
            for (Clerk<T> clerk : clerks) {
                clerk.process(input);
            }
        }
    
        public void processAll(Iterable<T> inputs) {
            for (T input : inputs) {
                process(input);
            }
        }
    }
    

    也许?

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-27
    • 2013-01-10
    • 2023-03-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多