【发布时间】:2015-05-15 08:14:59
【问题描述】:
我是弹簧整合 kafka 的新手,我了解 kafka-oubound-channel 适配器。但是有没有一种方法可以让我无需在上下文 xml 中设置就可以程序化地创建主题?
ie:根据我给转换器的消息,我想将消息发布到为此消息类型创建的 kafka 主题。
更新:
下面是我最终做的事情。将欢迎任何更好的解决方案。
<int:channel id="inputForSolrPublish"></int:channel>
<int:service-activator input-channel="inputForSolrPublish"
ref="solrMasterListRouter" >
-->
private void postMessageToMasterSpecifcTopics(final List<String> topicNames,
final String brokerList,
final Message<?> message) throws Exception {
for (String topicName : topicNames) {
createProducerContext(topicName,
brokerList).send(topicName,
message.getHeaders()
.get(KafkaHeaders.MESSAGE_KEY),
message);
}
}
private KafkaProducerContext<String, String> createProducerContext(final String topicName,
final String brokerList) throws Exception {
KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
AvroReflectDatumBackedKafkaEncoder<String> kafkaReflectionEncoder = new AvroReflectDatumBackedKafkaEncoder<>(String.class);
AvroSpecificDatumBackedKafkaEncoder<String> kafkaSpecificEncoder = new AvroSpecificDatumBackedKafkaEncoder<>(String.class);
// Encoder<String> encoder = new
// org.springframework.integration.kafka.serializer.common.StringEncoder<String>();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(topicName);
producerMetadata.setValueClassType(String.class);
producerMetadata.setKeyClassType(String.class);
producerMetadata.setValueEncoder(kafkaSpecificEncoder);
producerMetadata.setKeyEncoder(kafkaReflectionEncoder);
producerMetadata.setAsync(true);
Properties props = buildProducerConfigProperties();
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata,
brokerList,
props);
ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata,
producer.getObject());
kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(topicName,
config));
return kafkaProducerContext;
}
private Properties buildProducerConfigProperties() {
Properties props = new Properties();
props.put("topic.metadata.refresh.interval.ms",
"3600000");
props.put("message.send.max.retries",
"5");
props.put("tsend.buffer.bytes",
"5242880");
return props;
}
【问题讨论】:
-
嗨@Harshjgs,请告诉我
KafkaHeaders.MESSAGE_KEY字段的用法。我找不到它的用途。
标签: spring spring-integration apache-kafka kafka-consumer-api