【问题标题】:How to route messages from kafka-sink to multiple topics如何将消息从 kafka-sink 路由到多个主题
【发布时间】:2017-03-28 15:53:00
【问题描述】:

我有一个带有 http-outbound-gateway 的 spring-xd http-processor 模块,它有一个 errorChannel 和 outputChannel。任何带有 HTTP 200 的消息都进入 outputChannel,其余消息进入 failureChannel。

现在,http-processor 模块使用带有 TopicX 的 kafka-outbound-adapter 连接到 Kafka-Sink。 TopicX 仅接收 HTTP 200 消息以进行进一步处理。现在,我们需要将 failureChannel 中的消息路由到 TopicY。

如何在 kafka-sink 中发送多个 kafka 主题的消息。我在消息头中有 httpStatusCode。我项目中使用的Kafka版本是0.8.2,java版本是1.7

<!-- http-processor-config -->
<int-http:outbound-gateway
        request-channel="input"
        url-expression="'myUrlLink'"
        http-method="POST"
        expected-response-type="java.lang.String"
        charset="UTF-8"
        reply-timeout="10"
        reply-channel="output">

        <int-http:request-handler-advice-chain>
                    <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
                        <property name="recoveryCallback">
                            <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
                                <constructor-arg ref="errorChannel" />
                            </bean>
                        </property>
                        <property name="retryTemplate" ref="retryTemplate" />
                    </bean>
        </int-http:request-handler-advice-chain>

</int-http:outbound-gateway>


<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>

在 Kafka Sink 上,我有以下生产者上下文:

    <int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="localhost:9092"
                                          topic="${topicX}"
                                          key-class-type="java.lang.String"
                                          key-serializer="serializer"
                                          value-class-type="[B"
                                          value-serializer="valueSerializer"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

【问题讨论】:

    标签: apache-kafka spring-integration spring-xd kafka-producer-api


    【解决方案1】:

    这是真的,它不受支持,也不会支持。 Spring XD 今年已经停产。鼓励大家迁移到Spring Cloud Data Flow

    对于您的用例,您可以编辑 Kafka Sink 模块配置。为另一个主题再添加一个&lt;int-kafka:outbound-channel-adapter&gt;。要决定将传入消息发送到哪个主题,您可以将&lt;router&gt; 添加到此配置中。

    或者只是考虑使用Router Sink。并且每个消息类型有两个单独的流,因此每个主题都有。

    【讨论】:

      【解决方案2】:

      我终于让它工作了。现在我找到了一个 0.8.x 版本的解决方法,通过在 http-processor 模块中添加一个拆分器,并在消息头中添加一个 kafka_topic 变量。基于 Http 状态码,我只是设置了不同的主题。

      在 Kafka-sink 上,我添加了另一个生产者配置,其中包含通过 XD 参数设置的新主题名称变量。我想不出任何其他解决方案,因为我在多个流中重用 kafka-source 和 kafka-sink 模块。

      这个特定的 kafka-sink 将输入发送到另一个 XD 流。因此,添加了一个 header-filter 以在下一个流开始时删除 kafka-source 模块中的 kafka_topic。

      阅读更多: http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.0.2.RELEASE/reference/html/_spring_integration.html

      查找设置目标 kafka 主题的行。这才是关键。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-10-09
        • 2020-05-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-12-15
        • 2021-09-08
        相关资源
        最近更新 更多