【问题标题】:Spring Cloud Stream Kafka binder fails to publish to DLQ with a keySpring Cloud Stream Kafka binder 无法使用密钥发布到 DLQ
【发布时间】:2019-08-13 21:42:26
【问题描述】:

@StreamListener 处理失败并且 Spring Cloud Stream Kafka binder 尝试将消息重新路由到 DLQ 时,我收到以下异常。使用 Spring Cloud Edgware.SR5。

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'my.message.destination.my.message.group.errors'; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
        at org.springframework.integration.support.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:155) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.2.RELEASE.jar:?]
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.2.RELEASE.jar:?]
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar:?]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.8.RELEASE.jar:?]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.8.RELEASE.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_192]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:360) ~[spring-cloud-stream-binder-kafka-1.3.3.RELEASE.jar:1.3.3.RELEASE]
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
        ... 19 more

尝试从kafka-console-producer 生成消息,发现只有在使用 Kafka 密钥时才会发生这种情况。

以下是相关代码sn-ps:

MyMessageConsumer.java:

  @StreamListener(MyMessageSink.MY_MESSAGE_INPUT)
  @Transactional
  public void consumeMyMessage(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String myMessageId, @Payload MyMessage myMessage) {
    if (true) {
      throw new RuntimeException("MockRuntimeException");
    }
  }

application.properties(针对消费者):

spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.bindings.my-message-in.destination=my.message.destination
spring.cloud.stream.bindings.my-message-in.group=my.message.group
spring.cloud.stream.bindings.my-message-in.content-type=application/json
spring.cloud.stream.bindings.my-message-in.consumer.headerMode=raw
spring.cloud.stream.bindings.my-message-in.consumer.partitioned=true
spring.cloud.stream.kafka.bindings.my-message-in.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqName=my.message.destination.dlq
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqProducerProperties.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqProducerProperties.configuration.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.my-message-in.consumer.maxAttempts=3

application.properties(用于生产者):

spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.bindings.my-message-out.destination=my.message.destination
spring.cloud.stream.bindings.my-message-out.content-type=application/json
spring.cloud.stream.bindings.my-message-out.producer.headerMode=raw
spring.cloud.stream.bindings.my-message-out.producer.partitionKeyExtractorClass=com.example.message.TransactionKeyExtractor
spring.cloud.stream.bindings.my-message-out.producer.partitionCount=80

有没有办法让 DLQ 重新路由以使用消息密钥?

【问题讨论】:

    标签: apache-kafka spring-cloud-stream


    【解决方案1】:

    1.3.x 的死字只支持 Spring Cloud Stream 的默认键/值类型 (byte[]/byte[])。

    尝试升级到更新的版本。

    【讨论】:

    • 其实是版本问题;这是 1.3.x 的限制。
    • Edgware.SR5 的任何解决方法?尝试升级到 Finchley.SR2,但遇到了兼容性问题。
    • 否;抱歉,1.3.x binder DLQ 代码已硬编码为 byte[]
    • 好的。无论如何感谢您的澄清!
    猜你喜欢
    • 2022-01-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-11
    • 2018-12-29
    • 1970-01-01
    • 2018-12-16
    • 2021-02-17
    相关资源
    最近更新 更多