【问题标题】:Reactor 3 Emitter/Subscriber paralllelReactor 3 发射器/订阅者并行
【发布时间】:2019-02-21 08:30:40
【问题描述】:

我是响应式编程的新手,有很多问题。 我认为这不是缺少示例或文档,而是我的理解是错误的。

我正在尝试模仿慢速订阅者;

这是代码示例

Flux.create(sink -> {
    int i = 0;
    while (true) {
        try {
            System.out.println("Sleep for " + MILLIS);
            Thread.sleep(MILLIS);
            int it = i++;
            System.out.println("Back to work, iterator " + it);
            sink.next(it);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
    try {
        System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
        Thread.sleep(MILLIS + 4000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

系统退出是

Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]

我想如果订阅者很慢,由于Schedulers.elastic(),我应该会看到更多线程

我也尝试制作publishOn(),看起来我让它异步,但仍然无法处理多个线程的结果。

感谢 cmets 和答案。

【问题讨论】:

  • 如果你想模拟慢速订阅者,你应该做相反的事情——而不是延迟发出一个项目,你应该创建很多没有任何延迟的项目
  • @pixel 这很公平,但是如果我正在使用阻塞代码,例如从队列中读取呢?
  • 那么这意味着您的发布者很慢(发出很少的项目)并且订阅者只是等待事件。因此,您不需要很多订阅者。
  • @pixel 但是我想我明白了。但是如果订阅者比发射者慢怎么办?您可以从代码中看到它,它作为具有 1 个工作线程的阻塞代码工作。
  • 当订阅者比发射者慢时,你可以利用背压。有一篇很好的文章:e4developer.com/2018/04/28/… 但是 - 你的代码没有反映这种情况:)

标签: java reactive-programming project-reactor


【解决方案1】:

如果你想让它在不同的线程中运行,你需要像这样使用 .parallel() 并且发射将在不同的线程中进行

Flux.create(sink -> {
        int i = 0;
        while (true) {
            try {
                System.out.println("Sleep for " + MILLIS);
                Thread.sleep(100);
                int it = i++;
                System.out.println("Back to work, iterator " + it);
                sink.next("a");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    })

            .parallel()
            .runOn(Schedulers.elastic())

            .subscribe(x -> {
                try {
                    System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
                    Thread.sleep(100 + 4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })
    ;
}

【讨论】:

  • 这很有趣,但我完全不明白发生了什么 :) 在我的脑海中应该看到像我们在一些 Mills 中发出的东西,并在并行线程中延迟消耗它,所以我们应该先发出几个值,然后消费。
  • @VIQ 您正在弹性订阅,但您应该知道您在流中仅订阅一次,并且您的流将在弹性池中找到一个空闲线程,因此所有流将在单个弹性线程中运行,所以你在等待延迟。在我的情况下,我将在弹性池中获得新线程的新并行流中的每个元素分开
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-20
  • 2018-03-21
相关资源
最近更新 更多