【问题标题】:Does Spring integeration kafka support dynamic topic creationSpring集成kafka是否支持动态创建主题
【发布时间】:2015-05-15 08:14:59
【问题描述】:

我是弹簧整合 kafka 的新手,我了解 kafka-oubound-channel 适配器。但是有没有一种方法可以让我无需在上下文 xml 中设置就可以程序化地创建主题?

ie:根据我给转换器的消息,我想将消息发布到为此消息类型创建的 kafka 主题。

更新:

下面是我最终做的事情。将欢迎任何更好的解决方案。

    <int:channel id="inputForSolrPublish"></int:channel>

<int:service-activator input-channel="inputForSolrPublish"
    ref="solrMasterListRouter" >

-->

 private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
                                              final String brokerList,
                                              final Message<?> message) throws Exception {

    for (String topicName : topicNames) {
        createProducerContext(topicName,
                              brokerList).send(topicName,
                                               message.getHeaders()
                                                      .get(KafkaHeaders.MESSAGE_KEY),
                                               message);

    }

}

private KafkaProducerContext<String, String> createProducerContext(final String topicName,
                                                                   final String brokerList) throws Exception {
    KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
    AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
    AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
    // Encoder<String> encoder = new
    // org.springframework.integration.kafka.serializer.common.StringEncoder<String>();

    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
    producerMetadata.setValueClassType(String.class);
    producerMetadata.setKeyClassType(String.class);
    producerMetadata.setValueEncoder(kafkaSpecificEncoder);
    producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
    producerMetadata.setAsync(true);

    Properties props = buildProducerConfigProperties();
    ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
                                                                                           brokerList,
                                                                                           props);
    ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
                                                                                             producer.getObject());
    kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
                                                                            config));
    return kafkaProducerContext;
}

private Properties buildProducerConfigProperties() {
    Properties props = new Properties();
    props.put("topic.metadata.refresh.interval.ms",
              "3600000");
    props.put("message.send.max.retries",
              "5");
    props.put("tsend.buffer.bytes",
              "5242880");
    return props;

}

【问题讨论】:

  • 嗨@Harshjgs,请告诉我KafkaHeaders.MESSAGE_KEY 字段的用法。我找不到它的用途。

标签: spring spring-integration apache-kafka kafka-consumer-api


【解决方案1】:

是的,您可以在运行时执行此操作。见TopicUtils.ensureTopicCreated

您可以像&lt;service-activator&gt; 一样将其添加为&lt;publish-subscribe-channel&gt; 的另一个订阅者(第一个订阅者)以发送消息。像这样的:

<publish-subscribe-channel id="sendMessageToKafkaChannel"/>

<service-activator input-channel="sendMessageToKafkaChannel" output-channel="nullChannel" order="1"
   ref="creatTopicService" method="creatTopic"/>

<int-kafka:outbound-channel-adapter channel="sendMessageToKafkaChannel" order="2"/>

接受creatTopic 整个消息并从消息中或在注入阶段提取所有必需的参数,例如注入ZookeeperConnect 以提取getZkConnect() 作为第一个zkAddress ensureTopicCreated 参数。

但是你应该明白,如果没有 Kafka 上的现有主题,你就不能拥有 &lt;int-kafka:message-driven-channel-adapter&gt;。因此,我不确定您将来将如何处理那些动态创建的主题中的消息。虽然&lt;int-kafka:inbound-channel-adapter&gt; 可能适用于这种情况......

【讨论】:

  • 感谢 Aetem。我结束了与您的建议非常相似的事情。让我知道你的想法。
  • 拥有&lt;int-kafka:producer-context&gt; 配置,您可以使用getProducerConfigurations() 将新的ProducerConfiguration 添加到现有的getProducerConfigurations() 从另一端,您可以使用单个&lt;int-kafka:producer-configuration&gt; 并始终向&lt;int-kafka:outbound-channel-adapter&gt; 发送消息。这是因为您在代码中使用了相同的 brokerList,以及所有其他 ProducerConfiguration 选项。
  • 谢谢。有意义会做那些基于春季的清理工作。并且会做更多的组件重用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-12-22
  • 2015-08-19
  • 2018-12-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多