【发布时间】:2021-11-16 02:39:53
【问题描述】:
我有一个反应流,它获取一些数据,循环数据,处理数据,最后将数据写入 Kafka
public Flux<M> sendData(){
Flux.fromIterable(o.getC()).publishOn(Schedulers.boundedElastic())
.flatMap(id->
Flux.fromIterable(getM(id)).publishOn(Schedulers.boundedElastic())
.flatMap( n -> {
return Flux.fromIterable(o.getD()).publishOn(Schedulers.boundedElastic())
.flatMap(d -> return Flux.just(sendToKafka));
})
)
.doOnError(throwable -> {
log.debug("Error while reading data : {} ", throwable.getMessage());
return;
})
.subscribe();
}
public void run(String... args){
sendData();
}
我希望此工作流程每分钟运行一次。有人可以帮助我了解如何在流中安排这一点吗?
【问题讨论】:
标签: apache-kafka spring-webflux spring-kafka project-reactor reactive-kafka