【发布时间】:2015-12-26 11:28:54
【问题描述】:
我正在发送有关主题的消息,并且可以使用 kafka 控制台和使用 main 方法的 Java 使用者代码读取相同的消息。但是没有从消费者配置 xml 中读取消息。请帮助解决此问题。我正在使用消息 int-kafka:message-driven-channel-adapter。 spring-kafka 版本 1.3.0.RELEASE
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka">
<int:queue />
</int:channel>
<bean id="zkConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect" />
</bean>
<bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory" >
<constructor-arg ref="zkConfiguration" />
</bean>
<bean id="decoder"
class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
<int-kafka:message-driven-channel-adapter id="messageProcessor"
channel="inputFromKafka"
connection-factory="kafkaConnectionFactory"
queue-size="${queue.size:1024}"
concurrency="${concurrency:50}"
auto-startup="true"
payload-decoder="decoder"
topics="${topics:nishant}" key-decoder="decoder" />
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
</beans>
消费者 Java 代码
@Component(value = "messageProcessor")
public class MessageConsumer {
public void reader(Map<String, Map<Integer, String>> payload) {
System.out.println("TEST");
}
}
【问题讨论】:
标签: spring-integration apache-kafka