【问题标题】:How to apply retention time configuration for Dead Letter Queue in Spring Cloud Stream Kafka Binder?如何在 Spring Cloud Stream Kafka Binder 中为死信队列应用保留时间配置?
【发布时间】:2021-03-12 15:26:06
【问题描述】:

我有一个使用 Spring Cloud Stream Kafka 的应用程序。 对于用户定义的主题,我可以通过下面提到的配置从指定的主题中删除记录。但此配置不适用于 DLQ 主题。

例如在下面的配置中,我在 binder 级别配置了保留时间。所以我的绑定级别下定义的生产者主题(学生主题)配置正确,我可以检查当主题日志超过指定保留字节(300000000)时记录是否被删除。

但是活页夹级别的保留时间不起作用 DLQ 主题(person-topic-error-dlq)。除了保留时间之外,还有什么不同的配置可以从 DLQ 主题中清除记录。

我该怎么做?

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                    binder:
                      brokers: localhost:19092
      bindings:
        person-topic-in:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json
          group: person-topic-group
        student-topic-out:
          binder: defaultKafka
          destination: student-topic
          contentType: application/json

【问题讨论】:

    标签: apache-kafka spring-cloud-stream dead-letter spring-cloud-stream-binder-kafka retention


    【解决方案1】:

    您只是为生产者绑定设置(默认)属性。

    也就是说,这仍然对我不起作用:

          binders:
            defaultKafka:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        default:
                          producer:
                            topic:
                              properties:
                                retention.bytes: 300000000
                                segment.bytes: 300000000
                          consumer:
                            topic:
                              properties:
                                retention.bytes: 300000000
                                segment.bytes: 300000000
    

    (这些属性甚至不适用于主要主题)。

    看起来默认的 kafka 消费者绑定属性有问题。

    这对我有用;这些属性适用于主要主题和死信主题:

    spring:
      cloud:
        stream:
          kafka:
            bindings:
              person-topic-in:
                consumer:
                  enableDlq: true
                  dlqName: person-topic-error-dlq
                  topic:
                    properties:
                      retention.bytes: 300000000
                      segment.bytes: 300000000
    

    【讨论】:

    • 那么有没有办法将此配置应用于所有主题?例如;我们可以通过从 binder 级别提供配置来将此配置传递给所有主题吗?我不想配置单个主题。我希望能够将它应用于单个级别的所有主题。
    • 我不知道这是设计使然还是错误(我建议后者),但看起来只有在绑定没有具体属性时才应用默认值 - 如果你移动dlq 属性也设置为默认值(删除对特定于绑定的 kafka 绑定属性的所有引用),它也可以正常工作(但这无济于事,除非您没有需要 DLQ 的绑定)。代码在spring-cloud-stream - 我建议你在那里打开一个GitHub问题并参考这个问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-16
    • 2021-02-17
    • 1970-01-01
    相关资源
    最近更新 更多