【问题标题】:Spring-Kafka: How to insert an application.yml topic in Producer KafkaSpring-Kafka:如何在 Producer Kafka 中插入 application.yml 主题
【发布时间】:2021-03-04 11:40:09
【问题描述】:

我有一个 spring-kafka 微服务,我最近添加了一个死信,以便能够发送各种错误消息

//some code..
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send("myDeadLetter", message);
    }
}

我想将死信的主题kafka称为“messageTopic”+“_deadLetter”,我的主要主题是“messageTopic”。在我的消费者中,主题名称为他提供了 application.yml,如下所示:

@KafkaListener(topics = "${spring.kafka.topic.name}")

如何通过可能从 application.yml 插入“+ deadLetter”来设置相同的 kafka 主题?我试过这样的事情:

@Component
@KafkaListener(topics = "${spring.kafka.topic.name}"+"_deadLetter")
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send("messageTopic_deadLetter", message);
    }
}

但它为我创建了两个同名的不同主题。我正在等待一些建议,感谢您的帮助!

【问题讨论】:

    标签: spring apache-kafka spring-kafka


    【解决方案1】:

    Kafka Listener 接受 Topic 名称的常量,我们不能在这里修改 TOPIC 名称。

    最好为实际主题和死信主题使用单独的方法(Kafka 侦听器),在 YAML 中定义两个不同的属性来保存两个主题名称。

    @KafkaListener(topics = "${spring.kafka.topic.name}")
    public void listen(......){
    
    }
    
    @KafkaListener(topics = "${spring.kafka.deadletter.topic.name}")
    public void listenDlt(......){
    
    }
    

    从 yml 或属性文件中引用 send(...) 中的主题名称

    @Component
    @KafkaListener(topics = "${spring.kafka.deadletter.topic.name}")
    public class KafkaProducer {
    
        @Value("${spring.kafka.deadletter.topic.name}")
        private String DLT_TOPIC_NAME;
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendDeadLetter(String message) {
            kafkaTemplate.send(DLT_TOPIC_NAME, message);
        }
    }
    

    【讨论】:

    • 我的问题是这个时候如何在函数内部调用主题`"kafkaTemplate.send("messageTopic_deadLetter", message)"而不用使用String
    • Ok.. 用@Value 定义一个新的占位符并在kafakTemplate.send() 中使用相同的占位符.. 在上面添加了代码sn-p
    • 如果我想创建一个具有自定义变量名称的通用 KafkaProducer 怎么办?例如。我想使用该通用生产者创建一个 Kafka 服务,有人可以实现并创建 1-n 个生产者,其主题可以在应用程序 yaml 中配置。谢谢!
    【解决方案2】:

    您可以使用 SpEL 构造主题名称:

    @KafkaListener(topics = "#{'${spring.kafka.topic.name}' + '_deadLetter'"})
    

    注意属性占位符和文字周围的单引号。

    【讨论】:

      【解决方案3】:

      此示例可能与您的用例无关,但如果对某人有帮助,请分享。

      如果您正在构建 Kafka Stream 应用程序,可以通过以下方式实现可变接收器主题名称:

      1. 向接收器主题生成时,传递一个以上下文作为参数的 lambda,以及将处理名称定义的方法。
              ... /* precedent stream operations */
              // terminal operation 'to'. 
              .to(
                  (k, v, ctx) -> sinkTopicNameGenerator(ctx),
                  Produced.with(Serdes, Serdes)
              );
      
      1. 实现生成接收器主题名称的方法:
        protected static String sinkTopicNameGenerator(RecordContext ctx) {
          return ctx.topic().concat("_deadLetter");
        }
      

      上面的例子很简单,可以简化为(k, v, ctx) -&gt; ctx.topic().concat("_deadLetter"),但我想在需要进一步转换的情况下保留单独的方法方法,即当部分主题名称将被一些常量或定义的正则表达式替换时在配置文件中。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-04-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-16
        • 2018-10-27
        相关资源
        最近更新 更多