【问题标题】:Unable to send messages to dead letter topic from consumer in Kafka无法从 Kafka 中的消费者向死信主题发送消息
【发布时间】:2020-11-27 18:58:31
【问题描述】:

'我正在尝试将消息路由到 Kafka 中的死信主题,以防在处理相应消息时出现任何故障。我已经为此功能设置了 SeektoCurrentErrorHandler 和 DeadLetterPublishingRecoverer。

消费者在执行此操作时会引发以下异常:

2020-08-07 12:09:38.841 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='a6558a22-470d-4708-b297-814996a42045' and payload='{123, 34, 101, 118, 101, 110, 116, 78, 97, 109, 101, 34, 58, 34, 116, 101, 115, 116, 95, 101, 120, 1...' to topic test_execution.DLT and partition 2:

org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.

2020-08-07 12:09:38.846 ERROR 1 --- [ntainer#2-0-C-1] o.s.k.l.DeadLetterPublishingRecoverer    : Dead-letter publication failed for: ProducerRecord(topic=test_execution.DLT, partition=2, headers=RecordHeaders(headers = [RecordHeader(key = kafka_dlt-original-topic, value = [116, 101, 115, 116, 95, 101, 120, 101, 99, 117, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-original-partition, value = [0, 0, 0, 2]), RecordHeader(key = kafka_dlt-original-offset, value = [0, 0, 0, 0, 0, 23, 15, -72]), RecordHeader(key = kafka_dlt-original-timestamp, value = [0, 0, 1, 115, -57, 103, -70, -126]), RecordHeader(key = kafka_dlt-original-timestamp-type, value = [67, 114, 101, 97, 116, 101, 84, 105, 109, 101]), RecordHeader(key = kafka_dlt-exception-fqcn, value = [111, 114, 103, 46, 115, 112, 114, 105, 110, 103, 102, 114, 97, 109, 101, 119, 111, 114, 107, 46, 107, 97, 102, 107, 97, 46, 108, 105, 115, 116, 101, 110, 101, 114, 46, 76, 105, 115, 116, 101, 110, 101, 114, 69, 120, 101, 99, 117, 116, 105, 111, 110, 70, 97, 105, 108, 101, 100, 69, 120, 99, 101, 112, 116, 105, 111, 110]), RecordHeader(key = kafka_dlt-exception-message, value = [    
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:570) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:385) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:278) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:214) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:54) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.FailedRecordProcessor.getSkipPredicate(FailedRecordProcessor.java:167) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:104) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1887) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1792) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1719) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1617) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1348) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1064) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:972) ~[spring-kafka-2.5.2.RELEASE.jar:2.5.2.RELEASE]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test_execution.DLT not present in metadata after 60000 ms.

我已经在 kafka 集群中创建了 test_execution.DLT 主题,并且我显然能够从控制台生产者向该主题生成消息。

消费者在 docker 容器中运行,而 kafka 集群是一个 3 VM 设置。 这些是 kafka 消费者使用的配置:

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put("spring.kafka.consumer.properties.spring.deserializer.key.delegate.class", StringDeserializer.class);
        props.put("spring.kafka.consumer.properties.spring.deserializer.value.delegate.class", JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return props;
    }
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new ErrorHandlingDeserializer<>(new StringDeserializer()),
                new ErrorHandlingDeserializer<>(new JsonDeserializer<>(AutomationEvent.class,false)));
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaOperations kafkaOperations) {
        return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations), new FixedBackOff(10000, 3));
    }

我在这里遗漏了什么吗?我是否需要修改任何服务器配置才能更新?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api spring-kafka kafka-producer-api


    【解决方案1】:

    test_execution.DLT 不存在

    框架不会自动为你创建死信主题;它必须已经存在。

    您可以通过添加NewTopic @Bean 来指示框架创建主题。

    有关示例,请参阅 this answer

    编辑

    默认情况下,我们会将记录发送到同一分区,因此 DLT 必须至少具有与原始主题一样多的分区,除非您提供目标解析器。

    the documentation

    默认情况下,死信记录被发送到名为 .DLT 的主题(原始主题名称以 .DLT 为后缀)和原始记录所在的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。如果返回的 TopicPartition 有负分区,则 ProducerRecord 中没有设置该分区,所以该分区被 Kafka 选中。

    【讨论】:

    • 我已经明确地在消费者之外创建了死信主题。还从控制台生产者向主题发送了一些测试消息。这个 NewTopic Bean 代表什么?这是否意味着我们必须为每个 DLT 主题显式提供 NewTopic bean,即使它已经在 kafka 集群中创建?
    • 您可能在 DLT 中没有足够的分区 - 请参阅编辑。 NewTopic bean 可以设置或增加分区,也可以使用命令行工具增加分区。
    • 我已经将所有 DLT 主题的分区和复制因子计数与它们的主要对应项进行了交叉检查。分区和复制因子计数完全相同(每个 3)。我不想在消费者中为 DLT 主题设置 NewTopic bean,因为我已经在集群中创建了它们。这些信息/元数据是否没有传递给消费者?
    • 如果主题已经存在并且有足够的分区,你就不需要NewTopic bean。它与运行时无关,它只是用于在启动期间提供主题。如果它有足够的主题并且您仍然收到此错误,那对我来说毫无意义。
    • 我自己测试了一下;向 3 个分区主题的分区 2 发送了一条记录,其中 .DLT 只有一个分区,我得到了与您完全相同的结果(如我所料)。然后我将 DLT 分区增加到 3 个,它工作得很好。
    【解决方案2】:

    如果您想将消息从消费者发送到主题,请确保您还指定了生产者配置。这个属性是spring.kafka.producer.bootstrap-servers

    此属性是必需的,否则生产者组件会默认尝试连接到 locahost,这会导致找不到主题。

    【讨论】:

    【解决方案3】:

    消费者不必向DLT发送任何信息,由框架处理,只是主题必须在之前存在

    关注此讨论 -> DeadLetterPublishingRecoverer - Dead-letter publication failed with InvalidTopicException for name topic at TopicPartition ends with _ERR

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-05-20
      • 1970-01-01
      • 2017-12-15
      • 1970-01-01
      • 1970-01-01
      • 2019-10-13
      • 2018-07-27
      • 1970-01-01
      相关资源
      最近更新 更多