【问题标题】:Project Reactor TopicProcessor multi threadingProject Reactor TopicProcessor 多线程
【发布时间】:2020-11-30 15:50:47
【问题描述】:

以下多线程示例从两个不同的线程写入TopicProcessor,并在两个不同的线程中从TopicProcessor 读取。但是,某处存在竞争条件,因此并非所有事件都传递给订阅者,导致应用程序在processed.await() 中永远挂起。有人知道为什么吗?

import reactor.core.publisher.Flux;
import reactor.extra.processor.TopicProcessor;
import reactor.extra.processor.WaitStrategy;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.util.Arrays.asList;

public class ReactorTopicProcessorSample {

  public static class Producer implements Callable<Void> {

    final String name;
    final List<String> data;
    final CountDownLatch producerCount;
    final TopicProcessor<String> topicProcessor;

    public Producer(String name, List<String> data, CountDownLatch submitted, TopicProcessor<String> topicProcessor) {
      this.name = name;
      this.data = data;
      this.producerCount = submitted;
      this.topicProcessor = topicProcessor;
    }

    @Override
    public Void call() throws Exception {
      producerCount.countDown();
      producerCount.await(); // wait until the other producer is submitted to be sure that they run in different threads
      Flux.fromIterable(data)
          .map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
          .delayElements(Duration.ofMillis(10))
          .subscribe(topicProcessor);
      System.out.println("Submitted " + name + " producer in thread " + Thread.currentThread().getName() + ".");
      return null;
    }
  }

  public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 100; i++) { // this sample doesn't hang every time. repeat a few times to make it reproducible
      realMain(args);
      System.out.println("\n--- the previous run was successful. running again ---\n");
    }
  }

  public static void realMain(String[] args) throws InterruptedException {

    List<String> numbers = asList("1", "2", "3", "4", "5", "6", "7", "8");
    List<String> characters = asList("a", "b", "c", "d", "e", "f", "g", "h");

    CountDownLatch producerCount = new CountDownLatch(2);
    CountDownLatch subscriberCount = new CountDownLatch(2);
    CountDownLatch processed = new CountDownLatch(
        (int) subscriberCount.getCount() * (numbers.size() + characters.size()));

    ExecutorService exec = Executors.newFixedThreadPool((int) producerCount.getCount());

    TopicProcessor<String> topicProcessor = TopicProcessor.<String>builder()
        .share(true)
        .name("topic-processor")
        .bufferSize(16)
        .waitStrategy(WaitStrategy.liteBlocking())
        .build();

    Flux<String> flux = Flux.from(topicProcessor)
        .doOnSubscribe(s -> subscriberCount.countDown());

    flux.subscribe(out -> {
      System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
      processed.countDown();
    });

    flux.subscribe(out -> {
      System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
      processed.countDown();
    });

    subscriberCount.await();

    exec.submit(new Producer("number", numbers, producerCount, topicProcessor));
    exec.submit(new Producer("character", characters, producerCount, topicProcessor));

    processed.await();
    exec.shutdown();
    topicProcessor.shutdown();
  }
}

依赖关系

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.3.2.RELEASE</version>
</dependency>
<dependency>
  <groupId>io.projectreactor.addons</groupId>
  <artifactId>reactor-extra</artifactId>
  <version>3.3.2.RELEASE</version>
</dependency>

示例行为:订阅者只接收字符或只接收数字,导致程序在processed.await() 中永远等待。这并非每次都会发生,有时会按预期工作。

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    如果我理解得很好,您希望有两个并行生产数据的生产者和两个并行消费的消费者。

    首先,您需要了解反应器或 RxJava 的工作原理。您需要了解冷发布者(订阅者订阅后开始发布数据的发布者)。

    现在回到您的代码,如果您查看TopicProcessor 的大理石图,您会发现该类用于将数据并行流式传输到多个消费者来自单个生产者 .您的竞态条件是由于 TopicProcessor 使用不当造成的。

    要解决此问题,您必须合并生产者并将主题处理器订阅到这个。示例如下:

    import reactor.core.publisher.Flux;
    import reactor.core.scheduler.Schedulers;
    import reactor.extra.processor.TopicProcessor;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import static java.util.Arrays.asList;
    
    public class ReactorTopicProcessorSampleFixed {
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 100; i++) { // this sample doesn't hang every time. repeat a few times to make it reproducible
                realMain(args);
                System.out.println("\n--- the previous run was successful. running again ---\n");
            }
        }
    
        public static void realMain(String[] args) throws InterruptedException {
    
            List<String> numbers = asList("1", "2", "3", "4", "5", "6", "7", "8");
            List<String> characters = asList("a", "b", "c", "d", "e", "f", "g", "h");
    
            CountDownLatch subscriberCount = new CountDownLatch(2);
            CountDownLatch processed = new CountDownLatch((int) subscriberCount.getCount() * (numbers.size() + characters.size()));
    
    
            //the producers will not produce anything until a subscriber is subscribed to it.
            //I used subscribeOn to produce the data on different threads.
            Flux<String> mergedFlux = Flux.fromIterable(numbers)
                    .map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
                    .subscribeOn(Schedulers.boundedElastic())
                    .mergeWith(Flux.fromIterable(characters)
                            .map(s -> "\"" + s + "\"" + " from thread " + Thread.currentThread().getName())
                            .subscribeOn(Schedulers.boundedElastic()));
    
    
            TopicProcessor<String> topicProcessor = TopicProcessor.share("topic-processor", 16);
    
            Flux<String> flux = Flux.from(topicProcessor).doOnSubscribe(s -> subscriberCount.countDown());
    
            flux.subscribe(out -> {
                System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
                processed.countDown();
            });
    
            flux.subscribe(out -> {
                System.out.println("Subscriber in thread " + Thread.currentThread().getName() + " received " + out);
                processed.countDown();
            });
    
            subscriberCount.await();
    
            mergedFlux.subscribe(topicProcessor);
    
            processed.await();
            topicProcessor.shutdown();
        }
    }
    

    执行的结果是这个:

        Subscriber in thread topic-processor-200 received "1" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "2" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "1" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "2" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "3" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "4" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "5" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "6" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "7" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "3" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "4" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "5" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "8" from thread boundedElastic-2
        Subscriber in thread topic-processor-200 received "a" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "b" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "6" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "7" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "8" from thread boundedElastic-2
        Subscriber in thread topic-processor-199 received "a" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "b" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "c" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "d" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "e" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "f" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "c" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "d" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "e" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "f" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "g" from thread boundedElastic-1
        Subscriber in thread topic-processor-200 received "h" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "g" from thread boundedElastic-1
        Subscriber in thread topic-processor-199 received "h" from thread boundedElastic-1
    

    如果这是您要找的,请告诉我。

    【讨论】:

      猜你喜欢
      • 2018-09-04
      • 2019-12-18
      • 1970-01-01
      • 2018-09-26
      • 2017-05-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多