【问题标题】:Unable to read exception header from DLQ spring cloud stream kafka无法从 DLQ spring cloud stream kafka 读取异常标头
【发布时间】:2020-01-26 02:10:33
【问题描述】:

使用 Spring Cloud Stream Kafka 侦听器从 kafka 主题读取消息,并在异常情况下将其发送到配置属性的死信队列

spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name=book_error

我可以向 DLQ 发送消息。但是,当我尝试从 DLQ 读取以便我可以根据 Error Message 采取行动时。我无法读取嵌入在标题中的异常。

@StreamListener("dlqChannel")
public void error(Message<?> message) {
    System.out.println("Handling ERROR: READING FROM DLQ");
    logger.info("header :" +message.getHeaders());
    logger.info("payload : " +message.getPayload());
    //return message;
}

标头有效负载似乎具有我无法破译的对象 ID。如何根据异常消息解析错误和处理。 以下是我尝试打印时得到的标题

header :{x-original-offset=[B@f82eb25, x-original-partition=[B@7a3b83c, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedTopic=book_errors, kafka_offset=0, x-exception-message=[B@6dcc9872, x-exception-fqcn=[B@68079694, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@70449372, x-original-topic=[B@6a3ca71e, x-original-timestamp-type=[B@63baad23, kafka_receivedPartitionId=0, contentType=application/json, x-original-timestamp=[B@37dd34f6, kafka_receivedTimestamp=1579990310188, kafka_groupId=bkerrgrp, x-exception-stacktrace=[B@6356ee7c}

【问题讨论】:

    标签: spring-cloud-stream


    【解决方案1】:

    异常标头是 byte[] - 这是 Kafka 支持的唯一类型。各种字符串值信息存储为String.getBytes(StandardCharsets.UTF_8))

    使用

    String exceptionMessage = new String(message.getHeaders().get("x-exception-message", byte[].class), StandardCharsets.UTF_8);
    

    使用适当的类型(int、long)存储数值。

    使用ByteBuffer.wrap(header).getInt() 等。

    这是存储标头的代码...

    kafkaHeaders.add(
            new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
    kafkaHeaders.add(new RecordHeader(X_ORIGINAL_PARTITION,
            ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()));
    kafkaHeaders.add(new RecordHeader(X_ORIGINAL_OFFSET,
            ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()));
    kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP,
            ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
    kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP_TYPE,
            record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
    kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN,
            throwable.getClass().getName().getBytes(StandardCharsets.UTF_8)));
    kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
            throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
    kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
            getStackTraceAsString(throwable).getBytes(StandardCharsets.UTF_8)));
    

    【讨论】:

    • 我总是收到org.springframework.messaging.MessagingException 换成X_EXCEPTION_FQCN。如何在此标头中向 DLQ 发送根本原因?
    • 查看X_EXCEPTION_STACKTRACE 标头中的Caused by
    猜你喜欢
    • 2019-02-15
    • 2018-12-29
    • 1970-01-01
    • 2019-08-13
    • 2018-12-17
    • 2019-04-17
    • 1970-01-01
    • 2022-12-20
    • 2020-08-22
    相关资源
    最近更新 更多