【发布时间】:2015-08-06 05:39:12
【问题描述】:
我正在使用 Spring-Integration-Kafka,以下是动态创建消费者以在控制台中接收和打印消息的示例。 消费类:
public class Consumer1 {
private static final String CONFIG = "kafkaInboundMDCAdapterParserTests-context.xml";
static ClassPathXmlApplicationContext ctx;
public static void main(final String args[]) {
ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
ctx.start();
addConsumer("test19", "default8");
ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
ctx.start();
addConsumer("test19", "default10");
}
public static void addConsumer(String topicId, String groupId) {
MessageChannel inputChannel = ctx.getBean("inputFromKafka", MessageChannel.class);
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new MessageReceiver(), "processMessage");
((SubscribableChannel) inputChannel).subscribe(serviceActivator);
KafkaConsumerContext<String, String> kafkaConsumerContext = ctx.getBean("consumerContext", KafkaConsumerContext.class);
try {
TopicFilterConfiguration topicFilterConfiguration = new TopicFilterConfiguration(topicId, 1, false);
ConsumerMetadata<String,String> consumerMetadata = new ConsumerMetadata<String, String>();
consumerMetadata.setGroupId(groupId);
consumerMetadata.setTopicFilterConfiguration(topicFilterConfiguration);
consumerMetadata.setConsumerTimeout("1000");
consumerMetadata.setKeyDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
consumerMetadata.setValueDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
ZookeeperConnect zkConnect = ctx.getBean("zookeeperConnect", ZookeeperConnect.class);
ConsumerConfigFactoryBean<String, String> consumer = new ConsumerConfigFactoryBean<String, String>(consumerMetadata,
zkConnect);
ConsumerConnectionProvider consumerConnectionProvider = new ConsumerConnectionProvider(consumer.getObject());
MessageLeftOverTracker<String,String> messageLeftOverTracker = new MessageLeftOverTracker<String, String>();
ConsumerConfiguration<String, String> consumerConfiguration = new ConsumerConfiguration<String, String>(consumerMetadata, consumerConnectionProvider, messageLeftOverTracker);
kafkaConsumerContext.getConsumerConfigurations().put(groupId, consumerConfiguration);
} catch (Exception exp) {
exp.printStackTrace();
}
}
}
入站配置文件:
<int:channel id="inputFromKafka"/>
<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181"
zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000"/>
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="false"
channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>
<bean id="kafkaReflectionDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg type="java.lang.Class" value="java.lang.String"/>
</bean>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="1000"
zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default1"
value-decoder="kafkaReflectionDecoder"
key-decoder="kafkaReflectionDecoder"
max-messages="5000">
<int-kafka:topic id="mdc1" streams="1"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
当我向主题“test19”发送任何消息时,配置的 ServiceActivator“processMessage”方法显示两条消息作为配置的两个客户,但这里的问题是我需要在添加到消费者上下文之前为每个客户加载入站配置文件.. 否则我在控制台中只收到一条消息。这是正确的方式还是我需要在这里更改任何内容?
谢谢。
【问题讨论】:
标签: spring-integration apache-kafka