【问题标题】:Intercepting Spring Cloud Stream Messages from Consumer only仅拦截来自消费者的 Spring Cloud Stream 消息
【发布时间】:2025-12-21 18:30:07
【问题描述】:

我目前正在使用带有 GlobalChannelInterceptor 的带有 Kafka 绑定器的 Spring Cloud Stream 来为我的 Spring Boot 微服务执行消息记录。

我有:

  1. 生产者将消息发布到SubscribableChannel
  2. 从 Stream 中监听的消费者(使用 @StreamListener 注释)

在消息从生产者发布到Stream并被消费者监听的整个过程中,观察到preSend方法被触发了两次:

  1. 在生产者端 - 当消息发布到流时
  2. 在消费者端 - 从 Stream 中侦听消息时

但是,出于我的日志记录目的,我只需要在消费者端拦截并记录消息。

有没有办法只在一侧(例如消费者端)拦截 SCS 消息?

如果您对此事有任何想法,我将不胜感激。谢谢!

参考:

  1. GlobalChannelInterceptor 文档 - https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.html

编辑

制片人

public void sendToPushStream(PushStreamMessage message) {
        try {
            boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
        log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
        } catch (JsonProcessingException ex) {
            log.error("Unable to parse push stream message.", ex);
        }
    }

生产者的streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    @Output(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();

}

消费者

@StreamListener(StreamChannel.PUSH_STREAM)
public void handle(Message<PushStreamMessage> message) {
    log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);

}

消费者的streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    @Input(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();

}

拦截器(公共库)

public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
       log.info("presend " + msg);
        return msg;
    }

    @Override
    public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
        log.info("postSend " + msg);
    }

}

【问题讨论】:

    标签: java spring-integration spring-cloud-stream


    【解决方案1】:

    对,为什么不关注GlobalChannelInterceptor选项,不申请

    将匹配通道名称的简单模式数组。

    ?

    所以,你可能有这样的事情:

    @GlobalChannelInterceptor(patterns = Processor.INPUT)
    

    或者在您的 SCSt 应用中使用自定义的输入通道名称。

    【讨论】:

    • 您好,Artem,感谢您的回复。如果应用程序需要同时发布到同一个频道并从同一个频道收听,您建议的 patterns 方法是否仍然有效?
    • 这对我来说没有意义。请出示您的申请。我脑子里编不出来这样的逻辑,抱歉
    • 对造成的混乱表示歉意。让我自己说得更好。请参考我上面添加的代码。将您的建议应用于我的案例,注释变为@GlobalChannelInterceptor(patterns = StreamChannel.PUSH_STREAM。但是,这并不能解决问题,因为拦截器仍然会在通道的两端拦截消息。
    • 您在代码中显示的内容必须在不同的微服务中,否则在 Kafka 中没有任何理由介于两者之间,通道可以只是简单的内存MesaageChannel
    • 是的,生产者和消费者是不同的微服务,因此使用了Kafka binder。