【问题标题】:Spring cloud stream - notification when Kafka binder is initializedSpring Cloud Stream - 初始化Kafka binder时的通知
【发布时间】:2020-03-23 08:51:52
【问题描述】:

我的 Spring Cloud Stream 应用程序中有一个简单的 Kafka 生产者。当我的 Spring 应用程序启动时,我有一个 @PostConstruct 方法,它执行一些协调并尝试将事件发送到 Kafka 生产者。

问题是,当和解开始将 enets 发送到其中时,我的 Kafka Producer 尚未准备好,导致以下情况:

org.springframework.messaging.MessageDeliveryException: Dispatcher 没有频道“orderbook-service-1.orderbook”的订阅者。;嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage .. 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)

有没有办法在我的应用程序启动期间收到通知,告知 Kafka 通道已初始化,这样我就只启动了记录作业。

这是我的代码 sn-ps:

public interface OrderEventChannel {
    String TOPIC_BINDING = "orderbook";
    @Output(TOPIC_BINDING)
    SubscribableChannel outboundEvent();
}

@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}

@Service
public class OutgoingOrderKafkaProducer {

    @Autowired
    private OrderEventChannel orderEventChannel;

   public void onOrderEvent( ClientEvent clientEvent ) {

        try {
            Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
            SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
            subscribableChannel.send( kafkaMsg );
        } catch ( RuntimeException rte ) {
            log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
        }
    }
..
..

}

【问题讨论】:

    标签: spring-boot apache-kafka spring-kafka spring-cloud-stream


    【解决方案1】:

    @PostConstruct 在上下文生命周期中太早了,无法开始使用 bean;它们仍在被创建、配置和连接在一起。

    您可以使用ApplicationListener(或@EventListener)来侦听ApplicationReadyEvent(请务必将偶数的applicationContext 与主应用程序上下文进行比较,因为您可能会收到其他事件)。

    你也可以实现SmartLifecycle,把你的代码放到start();把你的豆子放在一个迟到的Phase,这样它就在一切都连接好后启动。

    输出绑定在Integer.MIN_VALUE + 1000阶段开始,输入绑定在Integer.MAX_VALUE - 1000阶段开始。

    因此,如果您想在消息开始流动之前做某事,请在它们之间使用一个阶段(例如,0,这是默认值)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-12-16
      • 2021-02-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-05
      相关资源
      最近更新 更多