【问题标题】:Kafka-spring consumer is not reading messageKafka-spring 消费者没有阅读消息
【发布时间】:2015-12-26 11:28:54
【问题描述】:

我正在发送有关主题的消息,并且可以使用 kafka 控制台和使用 main 方法的 Java 使用者代码读取相同的消息。但是没有从消费者配置 xml 中读取消息。请帮助解决此问题。我正在使用消息 int-kafka:message-driven-channel-adapter。 spring-kafka 版本 1.3.0.RELEASE

         <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">


           <int:channel id="inputFromKafka">
            <int:queue />
           </int:channel>


           <bean id="zkConfiguration"  class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
    <constructor-arg ref="zookeeperConnect" />
</bean>

<bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory" >
    <constructor-arg ref="zkConfiguration" />
</bean>

  <bean id="decoder"
class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
<int-kafka:message-driven-channel-adapter id="messageProcessor" 
    channel="inputFromKafka"
    connection-factory="kafkaConnectionFactory"
    queue-size="${queue.size:1024}"
        concurrency="${concurrency:50}"
    auto-startup="true"
    payload-decoder="decoder"
    topics="${topics:nishant}" key-decoder="decoder" />
     <int-kafka:zookeeper-connect id="zookeeperConnect" 
  zk-connect="localhost:2181" zk-connection-timeout="6000"
    zk-session-timeout="6000" zk-sync-time="2000" />


         </beans>

消费者 Java 代码

              @Component(value = "messageProcessor")
              public class MessageConsumer {
            public void reader(Map<String, Map<Integer, String>> payload) {
            System.out.println("TEST");

             }
        }

【问题讨论】:

    标签: spring-integration apache-kafka


    【解决方案1】:

    您的配置有几个问题。

    首先,您的“消费者”和通道适配器具有相同的 bean 名称 (id) messageProcessor。在 Spring 中,最后一个获胜,因此一个定义将覆盖另一个。

    其次,配置所做的只是将消息转储到队列通道inputFromKafka - 与您的“消费者”没有任何连接。更改 bean 名称并添加服务激活器(并从通道中删除 &lt;int:queue/&gt; 元素)。

    <int:channel id="inputFromKafka" />
    
    <int:service-activator ref="myConsumer" input-channel="inputFromKafka" />
    

    将消息从适配器路由到您的消费者 bean 方法(假设有效负载可以转换为您所需的类型)。

    【讨论】:

    • 谢谢加里....现在一切就绪...我的错误我覆盖了 bean...并评论了服务激活器。只有一个问题我需要处理 600000 个请求我正在创建 10 个分区,并且消费者方面我将添加并发 = 10 和执行线程 pool-size="50" 现在我只有一台服务器..
    • 嗨,加里。请你帮帮我。一旦我从我的主题中读取了所有消息并重新启动服务器,它就不应该从头开始读取消息......我无法找到如何在消息驱动配置中实现这一点。谢谢
    • 你真的应该为此问另一个问题,而不是捎带这个问题。您需要提供offset-manager - 例如,请参阅kafka-sample;它使用 Java 配置,但同样适用于 XML 配置。如果您多次运行示例,您会看到它每次都不是从头开始。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 2017-11-09
    • 2020-04-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多