【问题标题】:Spring Integration - Apache ActiveMQ to KafkaSpring 集成 - Apache ActiveMQ 到 Kafka
【发布时间】:2017-12-12 21:45:37
【问题描述】:

我正在使用以下配置将 activemq 与 kafka 集成。我收到来自 activemq 的消息并将其转发给 kafka。但是,我注意到消息正在从 JMS 队列中出列,但消息不会发送到 kafka。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/integration/kafka 
    http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <jms:message-driven-channel-adapter
        id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory"
        channel="helloChannel" extract-payload="true" />

    <integration:channel id="helloChannel" />

    <integration:service-activator id="sayHelloServiceActivator"
        input-channel="helloChannel" ref="sayHelloService" method="sayHello" />

    <int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter" kafka-template="template"
        auto-startup="false" sync="true" channel="helloChannel" topic="test1234" 
        >
    </int-kafka:outbound-channel-adapter>

    <bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="localhost:9092" />
                        <!--entry key="retries" value="5" /> <entry key="batch.size" value="16384" 
                            /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" 
                            /> < entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" 
                            /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" 
                            / -->
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
    </bean>



</beans>

此外,如果 Kafka 出现任何问题,它甚至不会报告任何异常堆栈跟踪。

我错过了什么吗?

【问题讨论】:

  • 我不太了解 Spring,但在 Kafka 中,您需要配置 acks=all 以在代理存储您的消息时获得确认。如果 acks=0,您将不会收到提交消息成功或失败的确认。

标签: apache-kafka spring-integration activemq spring-transactions


【解决方案1】:

正如@Hassen Bennour 所说,如果你想向两个消费者发送消息,你需要一个发布/订阅通道。

也就是说,你在 kafka 适配器上有auto-startup="false",所以它甚至不会订阅频道。

如果它已启动,您当前的配置消息将轮流交替发送到服务激活器和适配器。

【讨论】:

  • 感谢您的评论。我注意到我的 serviceActivator 是 void 类型,因此它不会继续。我现在可以从 jms 向 kafka 发送消息。感谢您的意见。
【解决方案2】:

您的消息由 sayHelloServiceActivator 使用。

所以将你的 helloChannel 频道类型更改为

<publish-subscribe-channel id="helloChannel"/>

默认为 DirectChannel

DirectChannel 具有点对点语义,但除此之外更多 类似于 PublishSubscribeChannel 比任何基于队列的 上面描述的通道实现。它实现了 SubscribableChannel 接口而不是 PollableChannel 接口,因此它直接将消息分派给订阅者。作为一个 点对点通道,但是,它不同于 PublishSubscribeChannel,因为它只会将每条消息发送到 单个订阅的 MessageHandler。

【讨论】:

  • 感谢您的评论。我注意到我的 serviceActivator 是 void 类型,因此它不会继续。我现在可以从 jms 向 kafka 发送消息。感谢您的投入。