【发布时间】:2019-02-21 08:30:40
【问题描述】:
我是响应式编程的新手,有很多问题。 我认为这不是缺少示例或文档,而是我的理解是错误的。
我正在尝试模仿慢速订阅者;
这是代码示例
Flux.create(sink -> {
int i = 0;
while (true) {
try {
System.out.println("Sleep for " + MILLIS);
Thread.sleep(MILLIS);
int it = i++;
System.out.println("Back to work, iterator " + it);
sink.next(it);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.elastic())
.subscribe(x -> {
try {
System.out.println("Value: " + x + ", Thread: " + Thread.currentThread().toString());
Thread.sleep(MILLIS + 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
系统退出是
Sleep for 1000
Back to work, iterator 0
Value: 0, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 1
Value: 1, Thread: Thread[elastic-2,5,main]
Sleep for 1000
Back to work, iterator 2
Value: 2, Thread: Thread[elastic-2,5,main]
我想如果订阅者很慢,由于Schedulers.elastic(),我应该会看到更多线程
我也尝试制作publishOn(),看起来我让它异步,但仍然无法处理多个线程的结果。
感谢 cmets 和答案。
【问题讨论】:
-
如果你想模拟慢速订阅者,你应该做相反的事情——而不是延迟发出一个项目,你应该创建很多没有任何延迟的项目
-
@pixel 这很公平,但是如果我正在使用阻塞代码,例如从队列中读取呢?
-
那么这意味着您的发布者很慢(发出很少的项目)并且订阅者只是等待事件。因此,您不需要很多订阅者。
-
@pixel 但是我想我明白了。但是如果订阅者比发射者慢怎么办?您可以从代码中看到它,它作为具有 1 个工作线程的阻塞代码工作。
-
当订阅者比发射者慢时,你可以利用背压。有一篇很好的文章:e4developer.com/2018/04/28/… 但是 - 你的代码没有反映这种情况:)
标签: java reactive-programming project-reactor