【问题标题】:Spring-Integration-Kafka outbound-channel-adapter Send messageSpring-Integration-Kafka outbound-channel-adapter 发送消息
【发布时间】:2017-04-28 07:25:26
【问题描述】:

使用 Spring-Integration-Kafka 和 outbound-channel-adapter 我正在尝试将消息发送到名为“test”的主题

通过命令行终端,我启动了zookeeper、kafka并创建了名为"test"

的主题

Spring XML 配置

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    kafka-template="template"
                                    sync="true"
                                    topic="test">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

JUnit 测试代码

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
        "classpath:kafka-outbound-context.xml"
        })
public class ProducerTest{

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel channel;

    @Test
    public void test_send_message() {

        channel.send(MessageBuilder.withPayload("Test Message")
                .setHeader(KafkaHeaders.TOPIC, "test").build());

    }

}

测试用例成功,调试时我发现 channel.send() 返回 true

我通过命令行使用以下命令检查主题,但在测试主题中没有看到任何消息。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

谁能告诉我为什么在我的 test 主题上看不到任何消息?

【问题讨论】:

    标签: java spring apache-kafka spring-integration


    【解决方案1】:

    您查看过日志吗?你需要配置key和value序列化器,否则你会得到

    Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
    

    使用java时:

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    

    映射键是key.serializervalue.serializer

    【讨论】:

    • 谢谢@Gary Russell。添加键和值序列化程序后它工作了。
    • Gary,对于复杂的 Java 对象,Serializer 类应该是什么? ByteSerializer 会做吗?
    • 还有任何文档描述要设置的强制属性,如 bootstrap.servers、key.serializer、value.serializer 等。同样适用于消费者。
    • 您必须参考kafka documentation 以了解有关属性的详细信息以及哪些没有默认值。 Kafka 没有为复杂对象提供开箱即用的序列化程序。您必须转换为byte[]。 Spring-Kafka 提供了几种使用 JSON 的机制 - 请参阅 my spring one talk demo apps for how to use them - 请参阅 app5 和 app6。
    猜你喜欢
    • 2015-07-08
    • 2013-09-28
    • 1970-01-01
    • 2015-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多