【问题标题】:spring integration kafka - pause/seek consumer from my listener for a small timespring integration kafka - 暂停/从我的听众那里寻找消费者一小段时间
【发布时间】:2018-08-12 17:47:42
【问题描述】:

我的 Spring 集成图如下所示。

在我的服务激活器中,是否可以根据我的消息获取侦听器,然后将其暂停一段时间,如果更进一步,我的处理速度很慢?我需要这种方式来处理一些溢出机制。

我知道我可以实现一个新的 consumerSeekCallback,但在集成设置中,据我了解,我无法访问 messageDrivenChannelAdapter。

我正在使用链接到消息驱动通道适配器的 ConcurrentMessageListenerContainer。

    <int-kafka:message-driven-channel-adapter
    id="kafkaListenertest" listener-container="containertest" auto-startup="true"
    phase="100" send-timeout="5000" channel="kafkaMessage" error-channel="overflow"  />

<bean id="containertest"
    class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
    <constructor-arg ref="kafkaConsumerFactory"/>
    <constructor-arg ref="consumerContainerPropertiestest" />
    <property name="concurrency" value="4"/>
</bean>

<bean id="consumerContainerPropertiestest"
    class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="events.test" />
    <property name="ackMode" value="MANUAL_IMMEDIATE"></property>
</bean>

<int:service-activator input-channel="kafkaMessage"
    ref="MyListener" method="handleIncomingKafkaEvent" ></int:service-activator>

<int:channel id="kafkaMessage"></int:channel>

【问题讨论】:

    标签: apache-kafka spring-integration spring-kafka


    【解决方案1】:

    启动版本2.1.3,Spring Kafka在MessageListenerContainer上提供了这个API:

    /**
     * Pause this container before the next poll().
     * @since 2.1.3
     */
    default void pause() {
        throw new UnsupportedOperationException("This container doesn't support pause");
    }
    
    /**
     * Resume this container, if paused, after the next poll().
     * @since 2.1.3
     */
    default void resume() {
        throw new UnsupportedOperationException("This container doesn't support resume");
    }
    
    /**
     * Return true if {@link #pause()} has been called; the container might not have actually
     * paused yet.
     * @return true if pause has been requested.
     * @since 2.1.5
     */
    default boolean isPauseRequested() {
        throw new UnsupportedOperationException("This container doesn't support pause/resume");
    }
    
    /**
     * Return true if {@link #pause()} has been called; and all consumers in this container
     * have actually paused.
     * @return true if the container is paused.
     * @since 2.1.5
     */
    default boolean isContainerPaused() {
        throw new UnsupportedOperationException("This container doesn't support pause/resume");
    }
    

    因此,您确实可以从应用程序的任何位置暂停和恢复您的侦听器容器,将 containertest 注入到适当的服务中。

    KafkaMessageDrivenChannelAdapter 还公开了 pause()resume() 挂钩。

    另外,在KafkaMessageDrivenChannelAdapter 中,MessagingMessageConverter 将这些标头填充到消息中以供下游处理:

        rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key());
        rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic());
        rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition());
        rawHeaders.put(KafkaHeaders.OFFSET, record.offset());
        rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, record.timestampType().name());
        rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, record.timestamp());
    
        if (acknowledgment != null) {
            rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
        }
        if (consumer != null) {
            rawHeaders.put(KafkaHeaders.CONSUMER, consumer);
        }
    

    因此,您可以获取KafkaHeaders.CONSUMER 标头并使用来自KafkaConsumer 的本机pause()/resume()

    【讨论】:

    • 我刚刚注意到我必须使用 spring kafka1.3.4 和 spring integration kafka 2.3.0。无法立即升级。我们可以在这些版本中做这样的事情吗?
    • 不,抱歉:我们不能。
    • 另一方面,在大多数情况下,我们不需要担心暂停/恢复,因为即使您的侦听器被阻止,consumer 也会有后台心跳任务。
    • 基本上,如果我的听众忙/不能接受更多,我不希望消费者轮询。想在几秒钟后侦听器空闲时轮询更多消息。
    • 好吧,如果它很忙,它肯定不会去投票。您可以在进程结束时添加一些Thread.sleep(),同时您无法升级到最新的Spring Kafka
    猜你喜欢
    • 1970-01-01
    • 2018-10-03
    • 1970-01-01
    • 2016-09-01
    • 2016-08-19
    • 1970-01-01
    • 1970-01-01
    • 2020-05-02
    • 1970-01-01
    相关资源
    最近更新 更多