【问题标题】:Spring Cloud Stream > SendTo does not send to Kafka but directly via direct channelSpring Cloud Stream > SendTo 不发送到 Kafka,而是直接通过直接通道发送
【发布时间】:2019-03-22 23:35:21
【问题描述】:

我的应用程序中有两个通道绑定到两个 Kafka 主题:

  1. 输入
  2. error.input.my-group

配置输入是为了在出错的情况下向 dlq (error.input.my-group) 发送消息。

我在“error.input.my-group”上有一个 StreamListener,它被配置为将消息发送回原始频道。

@StreamListener(Channels.DLQ)
@SendTo(Channels.INPUT)
public Message<?> reRoute(Message<?> failed){
    messageDeliveryService.waitUntilCanBeDelivered(failed);
    processed.incrementAndGet();
    Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
    retries = retries == null ? 1 : retries+1;
     if (retries < MAX_RETRIES) {
        logger.info("Retry (count={}) for {}", retries, failed);
        return buildRetryMessage(failed, retries);
    }
    else {
        logger.error("Retries exhausted (-> sent to parking lot) for {}", failed);
        Channels.parkingLot().send(MessageBuilder.fromMessage(failed)
                .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                        failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                .build());
    }
    return null;
}

private Message<?> buildRetryMessage(Message<?> failed, int retries) {
    return MessageBuilder.fromMessage(failed)
            .setHeader(X_RETRIES_HEADER, retries)
            .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                    failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
            .build();
}

这是我的频道类

        @Component
    public interface Channels {

        String INPUT = "INPUT";
        //Default name use by SCS (error.<input-topic-name>.<group-name>)
        String DLQ = "error.input.my-group";
        String PARKING_LOT = "parkingLot.input.my-group";

        @Input(INPUT)
        SubscribableChannel input();

        @Input(DLQ)
        SubscribableChannel dlq();

        @Output(PARKING_LOT)
        MessageChannel parkingLot();


}

这是我的配置

spring:
  cloud:
    stream:
      default:
        group: my-group
      binder:
        headerMode: headers      kafka:
        binder:
          # Necessary in order to commit the message to all the Kafka brokers handling the partition -> maximum durability
          # -1 = all
          requiredAcks: -1
          brokers: bootstrap.kafka.svc.cluster.local:9092,bootstrap.kafka.svc.cluster.local:9093,bootstrap.kafka.svc.cluster.local:9094,bootstrap.kafka.svc.cluster.local:9095,bootstrap.kafka.svc.cluster.local:9096,bootstrap.kafka.svc.cluster.local:9097
        bindings:
          input:
            consumer:
              partitioned: true
              enableDlq: true
              dlqProducerProperties:
                configuration:
                  key.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
          "[error.input.my-group]":
            consumer:
              # We cannot loose any message and we don't have any DLQ for the DLQ, therefore we only commit in case of success
              autoCommitOnError: false
              ackEachRecord: true
              partitioned: true
              enableDlq: false
      bindings:
        input:
          contentType: application/xml
          destination: input
        "[error.input.my-group]":
          contentType: application/xml
          destination: error.input.my-group
        "[parkingLot.input.my-group]":
          contentType: application/xml
          destination: parkingLot.input.my-group

问题是我的消息永远不会再次推送到 Kafka,而是直接传递到我的输入通道。是不是我误解了什么?

【问题讨论】:

  • 我不确定我是否理解这个问题。消息被发送到输出通道(在您的情况下为输入),但随后活页夹将其发送到主题。虽然您可以肯定地错误配置了某些内容,但您的问题中没有足够的信息让我们弄清楚它是什么。您能否将您的项目发布到 GitHib 或其他地方,以便我们查看?
  • 像奥列格一样,我误解了你的问题;我编辑了我的答案。

标签: apache-kafka spring-cloud-stream


【解决方案1】:

为了@SendTo kafka 目的地而不是直接,您需要一个输出绑定。

【讨论】:

  • 我只是遵循 Spring Cloud Stream 文档 (docs.spring.io/spring-cloud-stream/docs/current/reference/…) 中提出的模式。除了没有单独的 JVM 之外,我在同一个 JVM 中运行死信队列消耗。为什么我需要一个单独的输入绑定来使用该主题?
  • 我想我误解了你的问题;奥列格是对的。为了让@SendTo 去 Kafka,你需要一个 OUTPUT 目的地,而不是直接发送到输入通道。
  • 我刚刚更新了我的问题以提供更多详细信息。我不明白的是,我遵循了似乎表明消息将使用 @SendTo 重新发布到 Kafka 主题的文档。
  • 我想我明白了。我需要定义这个:@Output("input") SubscribableChannel input(); @Output("input") MessageChannel 输出();够用还是必须使用其他渠道?
  • 它需要是不同的通道,并且您还需要将输出绑定添加到 YML。在文档中,@SendTo 指的是 OUTPUT 绑定;您直接将其连接到您的主输入通道。
猜你喜欢
  • 2016-06-28
  • 2021-06-12
  • 2018-01-19
  • 1970-01-01
  • 1970-01-01
  • 2021-04-07
  • 1970-01-01
  • 1970-01-01
  • 2021-02-17
相关资源
最近更新 更多