【问题标题】:Spring Integration - publishSubscribeChannel aggregate after splitSpring Integration - 拆分后的 publishSubscribeChannel 聚合
【发布时间】:2018-06-27 09:53:25
【问题描述】:

我有一个基于 DSL 的流程,它使用 split 遍历对象列表并发送 Kafka 消息:

.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)

发送完所有消息后,我需要调用服务,还需要记录处理了多少消息。 我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe 执行实际的Kafka 发送,然后aggregate 执行服务调用:

.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
        .subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))

我在弄清楚如何使用 DSL 在 pubSubChannel 中实际执行 .aggregate 部分时遇到问题。到目前为止,我已经尝试过:

.subscribe(f ->  f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))

任何指针?

【问题讨论】:

    标签: spring-integration spring-integration-dsl


    【解决方案1】:

    AbstractMessageSplitter 默认有一个applySequence = true

    /**
     * Set the applySequence flag to the specified value. Defaults to true.
     * @param applySequence true to apply sequence information.
     */
    public void setApplySequence(boolean applySequence) {
    

    我们在消息中有这些标题:

    if (this.applySequence) {
        builder.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize);
    }
    

    聚合器的默认关联策略实际上是基于IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。这样,它将具有相同correlationKey 的消息收集到相同的MessageGroup 中。默认ReleaseStrategy 基于MessageGroupsequenceSize 标头比较。最后,默认的MessageGroupProcessor 只是将组中的所有消息收集到一条消息中,Collection 作为有效负载。换句话说,聚合器的默认行为与拆分器完全相反。

    我不知道你要从聚合器中输出什么,但你不需要任何其他逻辑来在那里配置 - 关联和释放逻辑应该基于默认状态。

    你可以在Reference Manual找到足够的信息。

    【讨论】:

      【解决方案2】:

      这取决于你在聚合之后想要什么 - 如果你只是想要一个有效负载列表,只需使用 aggregate()...

      @SpringBootApplication
      public class So51059703Application {
      
          public static void main(String[] args) {
              SpringApplication.run(So51059703Application.class, args);
          }
      
          @Bean
          public ApplicationRunner runner(ApplicationContext context) {
              return args -> {
                  context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                          Arrays.asList("a", "b", "c")));
              };
          }
      
          @Bean
          public IntegrationFlow flow() {
              return f -> f
                      .split()
                      .publishSubscribeChannel(p -> p
                              .subscribe(f1 -> f1.handle(System.out::println))
                              .subscribe(f2 -> f2
                                      .aggregate()
                                      .handle(System.out::println)));
          }
      
      }
      

      如果你只想要计数:

      @SpringBootApplication
      public class So51059703Application {
      
          public static void main(String[] args) {
              SpringApplication.run(So51059703Application.class, args);
          }
      
          @Bean
          public ApplicationRunner runner(ApplicationContext context) {
              return args -> {
                  context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                          Arrays.asList("a", "b", "c")));
              };
          }
      
          @Bean
          public IntegrationFlow flow() {
              return f -> f
                      .split()
                      .publishSubscribeChannel(p -> p
                              .subscribe(f1 -> f1.handle(System.out::println))
                              .subscribe(f2 -> f2
                                      .aggregate(c -> c
                                              .processor(processor(), "reduce"))
                                      .handle(System.out::println)));
          }
      
          @Bean
          public Object processor() {
              return new Object() {
      
                  public int reduce(List<Message<?>> messages) {
                      return messages.size();
                  }
      
              };
          }
      
      }
      

      【讨论】: