【问题标题】:Getting ClassCastException when sending error message to kafka dlq向kafka dlq发送错误消息时获取ClassCastException
【发布时间】:2020-07-14 23:14:53
【问题描述】:

我正在使用带有弹簧云流的 kafka。我的监听器代码如下所示

    @Bean
    public Consumer<List<Map<String, Object>>> receive() throws MyException {
        return message -> {
            try {
                messageService.processMessage(message);
            } catch (MyException e) {
                throw new MyException(e.getMessage());
            }
        };
    }

我最近将其更改为以批处理模式使用消息。如果失败,我需要将消息转到 dlq 主题。现在它进入了简单的错误通道

    @StreamListener("errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        log.error("exception occurred. errorMessage = {}", errorMessage);
    }

但我得到一个带有错误跟踪的ClassCastException

2020-04-02 15:34:21.841 ERROR 22222 --- [container-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app'), failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking MessageListener#receive[1 args]; nested exception is com.corelogic.idap.idappipelinegenericelasticsink.exception.ElasticClientException: forced, failedMessage=GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}], headers={kafka_data=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], id=0cb01a1e-fd22-f409-bcbc-1f86880efeec, sourceData=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], timestamp=1585856060836}] for original GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1777) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:1501) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1383) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1254) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1007) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:483) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:217) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:201) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:527) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:497) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1475) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1370) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    ... 8 common frames omitted
Caused by: java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$8(KafkaMessageChannelBinder.java:1057) ~[spring-cloud-stream-binder-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

我的配置是这样的

spring.cloud.stream.bindings:
  input:
    destination: test
    group: ${application.group}
    consumer:
      max-attempts: 5
      batch-mode: true
  output:
    contentType: application/json

spring.cloud.stream.kafka.bindings:
  input:
    consumer:
      enableDlq: true
      dlqName: dead-topic
      autoCommitOnError: true
      autoCommitOffset: true
      republish-to-dlq: true
      dlqProducerProperties:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
          value.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

Kafka Consumer- ClassCastException java 这里有一个类似的问题,但没有多大帮助。

我需要像在这个问题中那样写自己的ListSerde 吗? Issue with ArrayList Serde in Kafka Streams API 还是等标准的ListSerde PR进去?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api spring-cloud-stream


    【解决方案1】:

    批处理侦听器当前不支持 DLQ;请在 GitHub 上针对 spring-cloud-stream-binder-kafka 打开一个问题。

    但请记住,框架不知道批次中的哪条记录失败,因此批次中的所有记录都会到那里,即使有些记录已成功处理。

    通常最好在批处理模式的侦听器中处理错误。

    【讨论】:

    • 是的。我一直想知道批处理将如何处理失败和成功记录的混合。那是我想研究的下一件事。但无论如何,在批处理模式的侦听器中处理错误是有意义的。我将用 spring-cloud-stream-binder-kafka 提出一个问题。谢谢!
    • @afzal,您能否提供此问题的链接(如果有)?似乎我面临同样的错误 - 批量消费者和 dlq。谢谢。
    • @daoway 我并没有真正为此创建问题。我最终手动重试并手动将失败的消息放入 dlq。
    猜你喜欢
    • 1970-01-01
    • 2019-02-27
    • 1970-01-01
    • 2021-01-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-13
    • 2019-01-16
    相关资源
    最近更新 更多