【问题标题】:spring kafka integration creating consumers dynamicallyspring kafka集成动态创建消费者
【发布时间】:2015-08-06 05:39:12
【问题描述】:

我正在使用 Spring-Integration-Kafka,以下是动态创建消费者以在控制台中接收和打印消息的示例。 消费类:

public class Consumer1 {
private static final String CONFIG = "kafkaInboundMDCAdapterParserTests-context.xml";
static ClassPathXmlApplicationContext ctx;

public static void main(final String args[]) {
    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default8");

    ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer1.class);
    ctx.start();
    addConsumer("test19", "default10");

}

public static void addConsumer(String topicId, String groupId) {

    MessageChannel inputChannel = ctx.getBean("inputFromKafka", MessageChannel.class);

    ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new MessageReceiver(), "processMessage");
    ((SubscribableChannel) inputChannel).subscribe(serviceActivator);

    KafkaConsumerContext<String, String> kafkaConsumerContext = ctx.getBean("consumerContext", KafkaConsumerContext.class);
    try {
        TopicFilterConfiguration topicFilterConfiguration = new TopicFilterConfiguration(topicId, 1, false);

        ConsumerMetadata<String,String> consumerMetadata = new ConsumerMetadata<String, String>();
        consumerMetadata.setGroupId(groupId);
        consumerMetadata.setTopicFilterConfiguration(topicFilterConfiguration);
        consumerMetadata.setConsumerTimeout("1000");
        consumerMetadata.setKeyDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));
        consumerMetadata.setValueDecoder(new AvroReflectDatumBackedKafkaDecoder<String>(java.lang.String.class));


        ZookeeperConnect zkConnect = ctx.getBean("zookeeperConnect", ZookeeperConnect.class);

        ConsumerConfigFactoryBean<String, String> consumer = new ConsumerConfigFactoryBean<String, String>(consumerMetadata,
                zkConnect);

        ConsumerConnectionProvider consumerConnectionProvider = new ConsumerConnectionProvider(consumer.getObject());
        MessageLeftOverTracker<String,String> messageLeftOverTracker = new MessageLeftOverTracker<String, String>();
        ConsumerConfiguration<String, String> consumerConfiguration = new ConsumerConfiguration<String, String>(consumerMetadata, consumerConnectionProvider, messageLeftOverTracker);

        kafkaConsumerContext.getConsumerConfigurations().put(groupId, consumerConfiguration);
    } catch (Exception exp) {
        exp.printStackTrace();
    }
}

}

入站配置文件:

<int:channel id="inputFromKafka"/>

<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181"
        zk-connection-timeout="6000"
        zk-session-timeout="6000"
        zk-sync-time="2000"/>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
        kafka-consumer-context-ref="consumerContext"
        auto-startup="false"
        channel="inputFromKafka">
    <int:poller fixed-delay="1" time-unit="MILLISECONDS"/>
</int-kafka:inbound-channel-adapter>

<bean id="kafkaReflectionDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
    <constructor-arg type="java.lang.Class" value="java.lang.String"/>
</bean>

<int-kafka:consumer-context id="consumerContext"
        consumer-timeout="1000"
        zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration group-id="default1"
                value-decoder="kafkaReflectionDecoder"
                key-decoder="kafkaReflectionDecoder"
                max-messages="5000">
            <int-kafka:topic id="mdc1" streams="1"/>
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

当我向主题“test19”发送任何消息时,配置的 ServiceActivator“processMessage”方法显示两条消息作为配置的两个客户,但这里的问题是我需要在添加到消费者上下文之前为每个客户加载入站配置文件.. 否则我在控制台中只收到一条消息。这是正确的方式还是我需要在这里更改任何内容?

谢谢。

【问题讨论】:

    标签: spring-integration apache-kafka


    【解决方案1】:

    完全不清楚你要做什么,但你确实有问题。

    通过在订阅消费者之前启动上下文,您可能会遇到问题(在开始和订阅之间的短时间内,Dispatcher 在inputFromKafka 上没有订阅者)。

    为什么您以编程方式创建服务激活器而不是在上下文中声明它?

    最好在上下文中配置所有内容(您可以通过环境中的属性并使用属性占位符配置器将groupId 等属性传递给上下文。

    【讨论】:

    • 你好,Gary,对我来说,作为要求,我正在尝试减少上下文中的配置,我想要的只是如何动态创建消费者并将服务激活器消息分发给所有消费者..
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-11-11
    • 2023-03-20
    • 2022-08-06
    • 1970-01-01
    • 2015-11-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多