【问题标题】:What is the latest (2019) preferred way to integrate spring batch with kafka?将spring batch与kafka集成的最新(2019)首选方式是什么?
【发布时间】:2019-03-20 22:18:47
【问题描述】:

这是I need current easy to follow instructions for configuring spring integration kafka from XML的后续问题

Spring-integration-kafka 在过去的几次迭代中发展了很多,许多旧示例不再起作用。

特别是,这个从 spring-batch 世界桥接到 spring 集成世界的 bean 不会实例化,因为 KafkaTemplate 类没有实现 MessagingTemplate 。目前推荐的完成这种集成的方法是什么?

<bean id="partitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    <property name="stepName" value="fm-step0002.messager"/>
    <property name="gridSize" value="3"/> 
    <property name="messagingOperations" ref="kafkaTemplate"/>
</bean>

这是我的 POM 中的一个片段,显示了我正在使用的库的版本:

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>3.1.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
            <version>5.1.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>4.1.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-infrastructure</artifactId>
            <version>4.1.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
            <version>4.1.1.RELEASE</version>
        </dependency>

【问题讨论】:

    标签: spring spring-integration spring-batch


    【解决方案1】:

    MessagingTemplate 是 Spring Integration 的核心组件;它与您使用的代理(RabbitMQ、Kafka、JMS 等)无关。

    您使用默认通道(这是 kafka 出站端点的输入通道)配置模板。

    查看batch documentation(单击文档顶部的 XML 按钮将示例从 java 更改为 XML)。

    此处的示例适用于 JMS,但配置与 Kafka 类似。

    <bean id="partitionHandler"
       class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
      <property name="stepName" value="step1"/>
      <property name="gridSize" value="3"/>
      <property name="replyChannel" ref="outbound-replies"/>
      <property name="messagingOperations">
        <bean class="org.springframework.integration.core.MessagingTemplate">
          <property name="defaultChannel" ref="outbound-requests"/>
          <property name="receiveTimeout" value="100000"/>
        </bean>
      </property>
    </bean>
    
    <int:channel id="outbound-requests"/>
    <int-jms:outbound-channel-adapter destination="requestsQueue"
        channel="outbound-requests"/>
    

    【讨论】:

    • 是的,但是 bean MessageChannelPartitionHandler 不会接受消息传递操作属性中的 KafkaTemplate。我克隆了 MessageChannelPartitionHandler 来制作一个 KafkaMessageChannelPartitionHandler - KafkaTemplate 似乎有一个方法receive 缺少/与接口 MessagingTemplate 不同的签名
    • 正如我所说,处理程序没有得到与 Kafka 有任何关系的模板,它得到一个 MessagingTemplate;该模板向通道发送消息;通道的订阅者是出站通道适配器,它反过来调用KafkaTemplate(或JmsTemplate,或RabbitTemplate)将消息实际发送到代理。请参阅我粘贴到答案中的配置(在这种情况下为 JMS)。只需将 jms 适配器替换为 kafka 适配器即可。
    • 知道了... MessageChannelPartitionHandler -> MessagingTemplate -> 出站通道适配器 -> KafkaTemplate(或其他模板)。
    猜你喜欢
    • 2012-10-20
    • 2012-10-21
    • 1970-01-01
    • 2018-07-02
    • 2023-03-31
    • 2015-02-08
    • 1970-01-01
    • 2015-11-03
    • 1970-01-01
    相关资源
    最近更新 更多