【发布时间】:2018-08-12 17:47:42
【问题描述】:
在我的服务激活器中,是否可以根据我的消息获取侦听器,然后将其暂停一段时间,如果更进一步,我的处理速度很慢?我需要这种方式来处理一些溢出机制。
我知道我可以实现一个新的 consumerSeekCallback,但在集成设置中,据我了解,我无法访问 messageDrivenChannelAdapter。
我正在使用链接到消息驱动通道适配器的 ConcurrentMessageListenerContainer。
<int-kafka:message-driven-channel-adapter
id="kafkaListenertest" listener-container="containertest" auto-startup="true"
phase="100" send-timeout="5000" channel="kafkaMessage" error-channel="overflow" />
<bean id="containertest"
class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
<constructor-arg ref="kafkaConsumerFactory"/>
<constructor-arg ref="consumerContainerPropertiestest" />
<property name="concurrency" value="4"/>
</bean>
<bean id="consumerContainerPropertiestest"
class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="events.test" />
<property name="ackMode" value="MANUAL_IMMEDIATE"></property>
</bean>
<int:service-activator input-channel="kafkaMessage"
ref="MyListener" method="handleIncomingKafkaEvent" ></int:service-activator>
<int:channel id="kafkaMessage"></int:channel>
【问题讨论】:
标签: apache-kafka spring-integration spring-kafka