【问题标题】:Working with Flux in a Scheduled Task在计划任务中使用 Flux
【发布时间】:2017-10-16 02:45:54
【问题描述】:

我正在开发一个 Spring Webflux 项目,在计划任务中尝试发布和使用 Flux 时遇到了问题。

@Scheduled(fixedRate = 20*1000)
fun updateNews() {
    try {
        logger.info("Automatic Update at: ${LocalDateTime.now()}")
        articleRepository.saveAll(
                sourceRepository.findAll().publishOn(Schedulers.parallel())
                        .map { source -> source.generate() }
                        .flatMap { it?.read() ?: Flux.empty() })
                        .timeout(Duration.ofSeconds(20)
        ).subscribeOn(Schedulers.parallel())
    } catch(e: Throwable) {
        logger.log(Level.SEVERE, "Error in Scheduler", e)
    }
}

我配置的调度器:

ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))

除非我在最后故意阻止,否则这个任务永远不会完成:

.then().block()

我最初并没有为直接引用发布/订阅调度程序而烦恼,并且我尝试了所有看似合理但无效的选项。

我的日志事件发生了,但似乎当调度程序执行此任务的线程死亡时,通量也是垃圾;即使我指定了 publishOn 和 subscribeOn 行为后它们应该在自己的线程池中?

我想让这个动作完全反应,任何建议都将不胜感激。

【问题讨论】:

    标签: project-reactor spring-webflux


    【解决方案1】:

    @Scheduled 没有与Flux 集成,因此如果您返回它,它不会知道如何处理Flux。结合这一事实,在 Reactor(以及一般的 Reactive Streams)中,在您 subscribe() 之前通常什么都不会发生,您可以开始查看问题所在。

    block() 实际上是subscribe() 的一种形式,这就是为什么一旦将其添加到代码中它就可以工作的原因。它实际上可能是这里最好的选择,因为您需要将一段反应性代码(来自ReactiveRepository)桥接到命令式阻塞世界(您的@Scheduled fun)。

    【讨论】:

    • 一般来说有没有更好的方法来完成这样的事情?概括问题:拥有非反应性的“发布者”,并且需要以尽可能连续的方式将它们拉入您的数据中。
    • 我现在看到我是在错误的假设下操作的,即saveAll() 作为消费者会导致通量链的触发。我不认为我真的需要block(),因为我根本不在乎saveAll() 电话的响应,所以subscribe() 可能是我最好的选择。
    • 啊,如果你想一劳永逸。也就是说,如果出现错误,向subscribe(...)提供错误处理程序可能会有所帮助
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-04-22
    • 2016-12-25
    • 2017-04-15
    • 2014-01-11
    • 1970-01-01
    • 2014-12-28
    • 1970-01-01
    相关资源
    最近更新 更多