【发布时间】:2020-03-23 08:51:52
【问题描述】:
我的 Spring Cloud Stream 应用程序中有一个简单的 Kafka 生产者。当我的 Spring 应用程序启动时,我有一个 @PostConstruct 方法,它执行一些协调并尝试将事件发送到 Kafka 生产者。
问题是,当和解开始将 enets 发送到其中时,我的 Kafka Producer 尚未准备好,导致以下情况:
org.springframework.messaging.MessageDeliveryException: Dispatcher 没有频道“orderbook-service-1.orderbook”的订阅者。;嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage .. 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
有没有办法在我的应用程序启动期间收到通知,告知 Kafka 通道已初始化,这样我就只启动了记录作业。
这是我的代码 sn-ps:
public interface OrderEventChannel {
String TOPIC_BINDING = "orderbook";
@Output(TOPIC_BINDING)
SubscribableChannel outboundEvent();
}
@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}
@Service
public class OutgoingOrderKafkaProducer {
@Autowired
private OrderEventChannel orderEventChannel;
public void onOrderEvent( ClientEvent clientEvent ) {
try {
Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
subscribableChannel.send( kafkaMsg );
} catch ( RuntimeException rte ) {
log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
}
}
..
..
}
【问题讨论】:
标签: spring-boot apache-kafka spring-kafka spring-cloud-stream