【问题标题】:Both sequential and parallel processing顺序处理和并行处理
【发布时间】:2023-03-31 09:46:01
【问题描述】:

我有一个生产者和许多消费者。

  • 生产者速度很快,产生了很多结果
  • 具有相同值的令牌需要按顺序处理
  • 必须并行处理具有不同值的令牌
  • 创建新的 Runnable 会非常昂贵,而且生产代码可以使用 100k 的 Token(为了创建 Runnable,我必须将一些复杂的构建对象传递给构造函数)

我可以使用更简单的算法获得相同的结果吗?用可重入锁嵌套同步块似乎有点不自然。 您可能会注意到任何竞争条件吗?

更新:我发现的第二个解决方案是使用 3 个集合。一是缓存生产者结果,二是阻塞队列,三是使用列表跟踪正在进行的任务。又有点复杂。

我的代码版本

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}

【问题讨论】:

  • ...必须并行处理... 很难强制任何两件或多件事情同时发生。 允许不同的线程并行做事,但Java中没有任何东西保证事情会并行发生。

标签: java multithreading synchronization locking


【解决方案1】:

您可以预先创建一组Runnables,它将挑选传入的任务(令牌)并根据它们的顺序值将它们放入队列中。

正如 cmets 中所指出的,保证具有不同值的令牌将始终并行执行(总而言之,您至少受到盒子中 nr 个物理核心的限制) .但是保证顺序相同的token会按照到达的顺序执行。

示例代码:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}

【讨论】:

  • 生产者比消费者快,你会在内存中有很多任务。此外,负载可能不会在工作人员之间均匀分布。
  • @danip 关于速度的持续差异——那么您需要切换到有界阻塞队列并接受由于阻塞导致的一些延迟。为了提高吞吐量,您需要在几个盒子之间分配负载。
  • 我相信使用多个盒子会使问题复杂化。我可以使用您提出的概念按顺序分配任务,但我必须使其统一。
【解决方案2】:

根据您的代码的性质,唯一能保证带有 相同的值串行处理就是等待STOP_TOKEN到达。

您需要单一生产者-单一消费者设置,以及消费者收集和分类 令牌的价值(比如说进入 Multimap)。

只有这样你才能知道哪些令牌可以串行处理,哪些可以并行处理。

无论如何,我建议你看看LMAX Disruptor,它提供了非常有效的线程间数据共享方式。

它不会像 Executors 那样受到同步开销的影响,因为它是无锁的(这可能会给您带来不错的性能优势,具体取决于您处理数据的方式)。

使用两个 Disruptor 的解决方案

// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());

// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));

inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));

// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));

inboundDisruptor.start();
outboundDisruptor.start();

// publisher code
for (int i = 0; i < 10; i++) {
    inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}

入站中断器上的事件处理程序仅收集传入令牌。当收到 STOP 令牌时,它会将一系列令牌发布给出站干扰器以进行进一步处理:

public class InEventHandler implements EventHandler<InEvent> {

    private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
    private Disruptor<OutEvent> outboundDisruptor;

    public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
        this.outboundDisruptor = outboundDisruptor;
    }

    @Override
    public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (event.token == STOP_TOKEN) {
            // publish indexed tokens to outbound disruptor for parallel processing
            tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
        } else {
            tokensByValue.put(event.token.value, event.token);
        }
    }
}

出站事件处理程序按顺序处理相同值的令牌:

public class OutEventHandler implements EventHandler<OutEvent> {

    private final long order;
    private final long allHandlersCount;
    private Object yourComplexDependency;

    public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
        this.order = order;
        this.allHandlersCount = allHandlersCount;
        this.yourComplexDependency = yourComplexDependency;
    }

    @Override
    public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (sequence % allHandlersCount != order ) {
            // round robin, do not consume every event to allow parallel processing
            return;
        }

        for (Token token : event.tokensToProcessSerially) {
            // do procesing of the token using your complex class
        }

    }
}

所需基础架构的其余部分(目的在 Disruptor 文档中描述):

public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {

    public static final InEventTranslator INSTANCE = new InEventTranslator();

    @Override
    public void translateTo(InEvent event, long sequence, Token arg0) {
        event.token = arg0;
    }

}

public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {

    public static final OutEventTranslator INSTANCE = new OutEventTranslator();

    @Override
    public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
        event.tokensToProcessSerially = tokens;
    }
}


public class InEvent {

    // Note that no synchronization is used here,
    // even though the field is used among multiple threads.
    // Memory barrier used by Disruptor guarantee changes are visible.
    public Token token;
}

public class OutEvent {
    // ... again, no locks.
    public Collection<Token> tokensToProcessSerially;

}

public class Token {
    String value;

}

【讨论】:

  • 我不想使用外部库。
【解决方案3】:

如果您有很多不同的令牌,那么最简单的解决方案是创建一些单线程执行器(大约是您的核心数量的 2 倍),然后将每个任务分配给由其令牌哈希确定的执行器。

这样所有具有相同token的任务都会转到同一个executor并按顺序执行,因为每个executor只有一个线程。

如果您对调度公平性有一些未说明的要求,那么很容易避免任何严重的不平衡,方法是让生产者线程在分发请求之前将其排队(或阻塞),直到请求少于 10 个每个执行人都未完成。

【讨论】:

  • 这是我会考虑的解决方案。生产者分配系统可能会使解决方案比我的示例更复杂,但也许我可以忽略这一点。
  • 我认为您的回答最接近重点。该算法会将代币放在不同的消费者中并跟踪它们。对于每个新令牌,将使用轮换的下一个消费者。如果一个令牌已经被处理过,同一个消费者将不得不处理它。
  • 在大多数情况下,您可以只使用 (token.hashCode()&0x7FFFFFFF)%number_of_consumers 来决定每个人的放置位置,而无需跟踪任何内容
  • 是不是有些消费者什么都得不到?
  • 如果你有许多不同的令牌,那是极不可能的。这也没关系,因为你的消费者比处理器内核多,所以你所有的 CPU 仍然很忙。
【解决方案4】:

下面的解决方案将只使用一个 Map,生产者和消费者使用一个 Map 来按顺序处理每个订单号的订单,同时并行处理不同的订单号。代码如下:

public class Main {

    private static final int NUMBER_OF_CONSUMER_THREADS = 10;
    private static volatile int sync = 0;

    public static void main(String[] args) {
        final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
        final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
        final AtomicBoolean done = new AtomicBoolean(false);

        // Create a Producer
        new Thread() {
            {
                this.setDaemon(true);
                this.setName("Producer");
                this.start();
            }

            public void run() {
                Random rand = new Random();

                for(int i =0 ; i < 1000 ; i++) {
                    int order = rand.nextInt(20);
                    String key = String.valueOf(order);
                    String value = String.valueOf(rand.nextInt());
                    Controller controller = queues.get(key);
                    if (controller == null) {
                        controller = new Controller();
                        queues.put(key, controller);
                    }
                    controller.add(new Token(order, value));
                    Main.sync++;
                }

                done.set(true);
            }
        };

        while (queues.size() < 10) {
            try {
                // Allow the producer to generate several entries that need to
                // be processed.
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }

        // System.out.println(queues);

        // Create the Consumers
        ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
        for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
            consumers.submit(new Runnable() {
                private Random rand = new Random();

                public void run() {
                    String name = Thread.currentThread().getName();
                    try {
                        boolean one_last_time = false;
                        while (true) {
                            for (Map.Entry<String, Controller> entry : queues.entrySet()) {
                                Controller controller = entry.getValue();
                                if (controller.lock(this)) {
                                    ConcurrentLinkedQueue<Token> list = controller.getList();
                                    Token token;
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(rand.nextInt(200));
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    int last = Main.sync;
                                    queues.remove(entry.getKey());
                                    while(done.get() == false && last == Main.sync) {
                                        // yield until the producer has added at least another entry
                                        Thread.yield();
                                    }
                                    // Purge any new entries added
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(200);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    controller.unlock(this);
                                }
                            }
                            if (one_last_time) {
                                return;
                            }
                            if (done.get()) {
                                one_last_time = true;
                            }
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumers.shutdown();
        System.out.println("Exiting.. remaining number of entries: " + queues.size());
    }

}

请注意,Main 类包含一个队列实例,它是一个 Map。映射键是您希望消费者按顺序处理的订单 ID。该值是一个 Controller 类,它将包含与该订单 ID 关联的所有订单。

生产者将生成订单并将订单(令牌)添加到其关联的控制器。消费者将遍历队列映射值并调用控制器锁定方法以确定它是否可以处理该特定订单 ID 的订单。如果锁返回 false,它将检查下一个 Controller 实例。如果锁返回true,它将处理所有订单,然后检查下一个Controller。

更新添加了同步整数,用于保证当控制器实例从队列映射中删除时。它的所有条目都将被消耗。很快就会调用解锁方法的消费者代码中存在逻辑错误。

Token 类与您在此处发布的类似。

class Token {
    private int order;
    private String value;

    Token(int order, String value) {
        this.order = order;
        this.value = value;
    }

    int getOrder() {
        return order;
    }

    String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Token [order=" + order + ", value=" + value + "]\n";
    }
}

随后的 Controller 类用于确保线程池中只有一个线程将处理订单。锁定/解锁方法用于确定允许哪些线程处理订单。

class Controller {

    private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
    private ReentrantLock lock = new ReentrantLock();
    private Runnable current = null;

    void add(Token token) {
        tokens.add(token);
    }

    public ConcurrentLinkedQueue<Token> getList() {
        return tokens;
    }

    public void unlock(Runnable runnable) {
        lock.lock();
        try {
            if (current == runnable) {
                current = null;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean lock(Runnable runnable) {
        lock.lock();
        try {
            if (current == null) {
                current = runnable;
            }
        } finally {
            lock.unlock();
        }
        return current == runnable;
    }

    @Override
    public String toString() {
        return "Controller [tokens=" + tokens + "]";
    }

}

有关实施的其他信息。它使用 CountDownLatch 来确保在流程退出之前处理所有生成的订单。 done 变量就像您的 STOP_TOKEN 变量一样。

该实施确实包含您需要解决的问题。存在一个问题,即在处理完所有订单后,它不会清除控制器的订单 ID。这将导致线程池中的线程被分配给不包含订单的控制器的实例。这将浪费可用于执行其他任务的 CPU 周期。

【讨论】:

  • 您将所有生成的结果保存在内存中。这是我需要避免的。
  • poll 方法将移除而不仅仅是从链接队列中获取条目。结果没有保存在内存中。
  • 生产者比所有消费者快得多,因此队列中的结果会快速增加。
【解决方案5】:

您是否只需要确保不会同时处理具有相同值的令牌?您的代码太杂乱,无法理解您的意思(它无法编译,并且有许多未使用的变量、锁和映射,它们已创建但从未使用过)。看来您对此想得太多了。您只需要一个队列和一张地图。 我想像这样的事情:

   class Consumer implements Runnable {
     ConcurrentHashMap<String, Token> inProcess;
     BlockingQueue<Token> queue;

     public void run() {
        Token token = null;
        while ((token = queue.take()) != null) {
           if(inProcess.putIfAbsent(token.getValue(), token) != null) {
              queue.put(token);
              continue;
           }
           processToken(token);
           inProcess.remove(token.getValue());
        }
     }
   }

【讨论】:

  • 代码在 Java 8 上编译得很好。我不确定你有什么编译问题。未使用的变量只是为了解释问题的复杂性。
  • "具有相同值的token需要按顺序处理"所以您不能将token放在队列的末尾,因为不会保留顺序。如果您运行我的示例,您将看到正确的输出。
  • @danip 当然,您创建的未使用变量越多,问题看起来就越复杂。你不应该创建变量“来展示复杂性”,每个变量都应该有自己明确的目的。试试看,你会发现问题并没有你想象的那么复杂。
  • @danip 我不确定您在返回队列时看到了什么问题,但是如果“按顺序”表示“按顺序”,那么您的整个设计需要重新设计,这不是正确的做法:让每个消费者查看自己的队列,并让生产者通过值的哈希码或一些类似的函数将令牌分发到队列。对于未来,请记住:“顺序”意味着“不并行”,不一定按顺序。
  • 顺序通常遵循数字或字母顺序:)
【解决方案6】:

具有相同值的令牌需要顺序处理

确保任何两件事按顺序发生的方法是在同一个线程中执行它们。

我有一个集合,不管有多少工作线程,我都会有一个 Map。每当我得到一个从未见过的令牌时,我都会随机挑选一个线程,并将令牌和线程输入到地图中。从那时起,我将使用同一个线程来执行与该令牌相关的任务。

创建新的 Runnable 会非常昂贵

Runnable 是一个接口。创建 实现 Runnable 的新对象不会比创建任何其他类型的对象更昂贵。

【讨论】:

  • 代码是一个抽象的例子。为了创建实现 Runnable 的特定对象,我需要将一些复杂对象传递给构造函数(也许您熟悉 JAXB 中的 Marshaller)。在这种情况下我应该采用什么策略?
  • 第二个“我会随机选择一个线程”,有什么办法可以使用执行器来完成,或者我必须直接管理线程?
  • @danip 我不知道你的第一个问题,但至于执行人,是的。我在回答中提到“线程”的所有地方,您都可以将其更改为“ExecutorService”,然后您需要通过调用 Executors.newSingleThreadExecutor() 来创建每个线程。
【解决方案7】:

也许我误解了什么。但似乎最初将具有相同值的令牌与具有不同值的令牌过滤到两个不同的队列中会更容易。

然后将 Stream 与 map 或 foreach 一起用于顺序。其余部分只需使用并行流版本。

如果您在生产环境中的 Token 是延迟生成的,而您一次只能得到一个,您只需制作某种过滤器,将它们分配到两个不同的队列中。

如果您可以使用 Streams 来实现它,我愿意这样做,因为它们简单、易用且快速!

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

我做了一个简短的例子来说明我的意思。在这种情况下,数字代币是人工构造的,但这不是重点。此外,流都在主线程上启动,这可能也不理想。

public static void main(String args[]) {
    ArrayList<Token> sameValues = new ArrayList<Token>();
    ArrayList<Token> distinctValues = new ArrayList<Token>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        int next = random.nextInt(100);
        Token n = new Token(i, String.valueOf(next));
        if (next == i) {
            sameValues.add(n);
        } else {
            distinctValues.add(n);
        }
    }       
    distinctValues.stream().parallel().forEach(token -> System.out.println("Distinct: " + token.value));
    sameValues.stream().forEach(token -> System.out.println("Same: " + token.value));       
}

【讨论】:

    【解决方案8】:

    我不完全确定我是否理解了这个问题,但我会试一试算法。

    演员是:

    • 一个queue的任务
    • 一个免费的poolexecutors
    • setin-process 令牌当前正在处理中
    • 一个controller

    那么,

    • 最初所有executors 都可用,set 为空

    • controller 选择一个可用的executor 并通过queue 寻找带有不在in-process set 中的令牌的task 以及何时找到它

      • 将令牌添加到in-process 集合中
      • 分配executor处理task
      • 回到队列的开头
    • executor 在完成处理后从set 中删除令牌并将其自身添加回池中

    【讨论】:

    • 唯一的问题是我需要一个阻塞队列,因为生产者非常快。我能找到的唯一实现是 ArrayBlockingQueue,但它是 FIFO,所以“通过队列”部分不容易实现。
    • 嗯,它不完全是一个队列,因为您需要删除乱序,同时在尾部添加。链表可能是更合适的数据结构。至于阻塞,对于这种复杂的项目,包装现有集合类之一应该是在范围内。
    【解决方案9】:

    这样做的一种方法是让一个执行器用于序列处理,一个用于并行处理。我们还需要一个单线程管理器服务来决定需要提交哪个服务令牌进行处理。 // 两个线程共享的队列。包含生产者生成的令牌。
    BlockingQueue tokenList = new ArrayBlockingQueue(10);

        private void startProcess() {
        ExecutorService producer = Executors.newSingleThreadExecutor();
        final ExecutorService consumerForSequence = Executors
                .newSingleThreadExecutor();
        final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
        ExecutorService manager = Executors.newSingleThreadExecutor();
    
        producer.submit(new Producer(tokenList));
    
        manager.submit(new Runnable() {
    
            public void run() {
                try {
                    while (true) {
                        Token t = tokenList.take();
                        System.out.println("consumed- " + t.orderid
                                + " element");
    
                        if (t.orderid % 7 == 0) { // any condition to check for sequence processing
    
                            consumerForSequence.submit(new ConsumerForSequenceProcess(t));
    
                        } else {
    
                            ConsumerForParallel.submit(new ConsumerForParallelProcess(t));
    
                        }
                    }
                }
    
                catch (InterruptedException e) { // TODO Auto-generated catch
                    // block
                    e.printStackTrace();
                }
    
            }
        });
    }
    

    【讨论】:

    • 您的代码将序列处理限制为单个消费者。多个消费者应该能够按顺序处理令牌。
    • 是的,但这可以处理。我们需要使用阻塞队列来保存需要按顺序处理的令牌,然后我们可以使用一组消费者而不是该队列中的一个来获取令牌进行处理。纠正我的任何建议。
    【解决方案10】:

    我认为这个任务背后隐藏着一个更基本的设计问题,但没关系。如果您想要按顺序执行,或者您只是希望对单个令牌描述的任务的操作是原子/事务性的,我无法从您的问题描述中弄清楚。我在下面提出的建议更像是对这个问题的“快速解决”,而不是真正的解决方案。

    对于真正的“有序执行”案例,我提出了一个基于队列代理的解决方案,该队列代理对输出进行排序:

    1. 定义 Queue 的实现,它提供了一个工厂方法生成代理队列,这些代理队列由这个单个队列对象表示给生产者端;工厂方法还应该注册这些代理队列对象。如果元素与输出队列之一中的元素之一匹配,则将元素添加到输入队列应将其直接添加到输出队列之一。否则将其添加到任何(最短的)输出队列。 (有效地对此进行检查)。或者(稍微好一点):在添加元素时不要这样做,但是当任何输出队列运行为空时。

    2. 为每个可运行的消费者提供一个存储单独队列接口的字段(而不是访问单个对象)。通过上面定义的工厂方法初始化此字段。

    对于事务案例,我认为跨越更多线程比拥有内核更容易(使用统计数据来计算),并在较低(对象)级别实现阻塞机制。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-28
      • 1970-01-01
      相关资源
      最近更新 更多