【发布时间】:2020-08-17 21:25:29
【问题描述】:
这是一个简单的spring boot应用:
@SpringBootApplication
@RestController
public class ReactiveApplication {
static Flux<String> fluxString;
static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();
private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
private static final long serialVersionUID = 1L;
@Override
public boolean add(String e) {
synchronized (this) {
notify();
}
return super.add(e);
}
@Override
public String poll() {
synchronized (this) {
if(isEmpty()) {
try {
wait();
} catch (InterruptedException ex) {}
}
}
return super.peek() == null ? "" : super.poll();
}
}
static Consumer<String> consumer = str -> queue.add(str);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ReactiveApplication.class, args);
}
static {
for(int i = 0; i < 10; i++)
queue.add("testData " + i + " ");
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> home() {
Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
List<String> list = new ArrayList<>(queue);
queue.removeAll(queue);
fluxString = Flux.<String>create(sink -> {
sink.onRequest(n -> {
for(int i = 0; i < n; i++) {
sink.next(queue.poll());
}
}).onCancel(() -> sch.dispose());
}).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));
return fluxString;
}
@GetMapping("/add")
public String add( @RequestParam String s) {
consumer.accept(s);
return s;
}
}
所以基本上这个应用程序创建了一个字符串流。访问/ 将获取所有存在的字符串队列,然后合并从/add 资源添加的任何内容(忽略“安全方法必须是幂等的”事情)。
我觉得奇怪的是,当我将 public static void main(...) 移动到第 1 行时,应用程序开始出现异常,向 /add 添加新值没有任何效果。我认为一定有一些有趣的事情正在发生使应用程序行为不端。有什么解释吗?
【问题讨论】:
-
我从未见过有人试图在一个类中编写应用程序。为什么不只是遵循指南和建议,并将您的控制器放在单独的类中。因为很明显,如果您不遵循一般准则,您会得到奇怪的行为。
-
@ThomasAndolf 我只是在玩这些东西。原来“静态变量由序列初始化”是答案,它与混合的东西无关。无论如何,如果你能帮助我,我有一个新问题。如果两个客户端订阅了
/,则使用/add添加的字符串以循环方式分发。现在,如果客户端断开连接,仍然需要对/add进行 2 次调用才能出现在订阅者/端点上。这个问题线性增加。看起来sch.dispose()什么都不做。有什么想法吗?我也通过分离关注点对其进行了测试。 -
onCancel仅在来自调用客户端的取消信号时触发。断开连接不是取消信号,projectreactor.io/docs/core/release/reference/… -
@ThomasAndolf 在阅读参考资料后我也有同样的疑问,但
onCancel中的快速系统输出清除了它。取消事件实际上是生成的。即使在验证之后,我也尝试了.subscribeOn(sch).doFinally(signal -> {...),但没有任何好处。查看日志后我发现Schedulers.newParallel创建的线程即使在scr.dispose()之后也没有被终止。但我不知道如何获取该线程的句柄或操作它。我已经阅读了文档、来源和参考,但我迷路了。 -
是否有任何理由说明您为什么需要在其单独的调度程序上使用它?将执行置于调度上的一个常见原因是阻塞调用期间的并行性。 forkjoinpool 中有一定数量的线程,我们不想占用这些线程来阻塞调用。但是,一旦您将某些内容放在调度程序上,我就会相信它“独立”
标签: java multithreading spring-boot spring-webflux project-reactor