【发布时间】:2018-06-27 09:53:25
【问题描述】:
我有一个基于 DSL 的流程,它使用 split 遍历对象列表并发送 Kafka 消息:
.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)
发送完所有消息后,我需要调用服务,还需要记录处理了多少消息。
我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe 执行实际的Kafka 发送,然后aggregate 执行服务调用:
.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))
我在弄清楚如何使用 DSL 在 pubSubChannel 中实际执行 .aggregate 部分时遇到问题。到目前为止,我已经尝试过:
.subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))
任何指针?
【问题讨论】:
标签: spring-integration spring-integration-dsl