【发布时间】:2025-12-21 18:30:07
【问题描述】:
我目前正在使用带有 GlobalChannelInterceptor 的带有 Kafka 绑定器的 Spring Cloud Stream 来为我的 Spring Boot 微服务执行消息记录。
我有:
- 生产者将消息发布到
SubscribableChannel - 从 Stream 中监听的消费者(使用
@StreamListener注释)
在消息从生产者发布到Stream并被消费者监听的整个过程中,观察到preSend方法被触发了两次:
- 在生产者端 - 当消息发布到流时
- 在消费者端 - 从 Stream 中侦听消息时
但是,出于我的日志记录目的,我只需要在消费者端拦截并记录消息。
有没有办法只在一侧(例如消费者端)拦截 SCS 消息?
如果您对此事有任何想法,我将不胜感激。谢谢!
参考:
-
GlobalChannelInterceptor文档 - https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.html
编辑
制片人
public void sendToPushStream(PushStreamMessage message) {
try {
boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
} catch (JsonProcessingException ex) {
log.error("Unable to parse push stream message.", ex);
}
}
生产者的streamChannel
public interface StreamChannel {
String PUSH_STREAM = "PushStream";
@Output(StreamChannel.PUSH_STREAM)
SubscribableChannel pushStream();
}
消费者
@StreamListener(StreamChannel.PUSH_STREAM)
public void handle(Message<PushStreamMessage> message) {
log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);
}
消费者的streamChannel
public interface StreamChannel {
String PUSH_STREAM = "PushStream";
@Input(StreamChannel.PUSH_STREAM)
SubscribableChannel pushStream();
}
拦截器(公共库)
public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
log.info("presend " + msg);
return msg;
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
log.info("postSend " + msg);
}
}
【问题讨论】:
标签: java spring-integration spring-cloud-stream