【发布时间】:2017-04-28 07:25:26
【问题描述】:
使用 Spring-Integration-Kafka 和 outbound-channel-adapter 我正在尝试将消息发送到名为“test”的主题
通过命令行终端,我启动了zookeeper、kafka并创建了名为"test"
的主题Spring XML 配置
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="false"
channel="inputToKafka"
kafka-template="template"
sync="true"
topic="test">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
JUnit 测试代码
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:kafka-outbound-context.xml"
})
public class ProducerTest{
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
@Test
public void test_send_message() {
channel.send(MessageBuilder.withPayload("Test Message")
.setHeader(KafkaHeaders.TOPIC, "test").build());
}
}
测试用例成功,调试时我发现 channel.send() 返回 true
我通过命令行使用以下命令检查主题,但在测试主题中没有看到任何消息。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
谁能告诉我为什么在我的 test 主题上看不到任何消息?
【问题讨论】:
标签: java spring apache-kafka spring-integration