【问题标题】:Problem with Spring Boot PUB/SUB sending message to topicSpring Boot PUB/SUB 向主题发送消息的问题
【发布时间】:2020-10-11 18:39:20
【问题描述】:

我正在构建 Spring Boot 应用程序,它将在一个主题上接收有效负载作为 PUB/SUB 消息,并将成功/错误消息返回给其他 PUB/SUB 主题。

  • 我有两个主题:inboundTopic 和 outboundTopic
  • 我有一个名为 inboundSub 的 inboundTopic 订阅者

这是配置代码:

@SpringBootApplication
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
    //region Inbound Channel adapter

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "inboundSub");
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    private static final Log LOGGER = LogFactory.getLog(PdfserviceApplication.class);
    @Bean
    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public MessageHandler messageReceiver() {
        return message -> {
            LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
            GTService.sendMessage(new String((byte[]) message.getPayload()));
        };
    }
    //endregion

    //region outbound channel adapter
    @Bean
    @ServiceActivator(inputChannel = "pubsubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
        return new PubSubMessageHandler(pubsubTemplate, "outboundTopic");
    }
    @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
    public interface PubsubOutboundGateway {
        void sendToPubsub(String text);
    }
    //endregion
}

因此,当消息到达 inboundTopic 时,我的应用程序被订阅并将该消息中继到我的 GTService 类,该类将自动装配 MessagingGateway 并简单地将另一条消息返回到 outboundTopic。

GTService 类:

public class GTService
{
    @Autowired
    private static PdfserviceApplication.PubsubOutboundGateway messagingGateway;

    public static void sendMessage (String payload){
        messagingGateway.sendToPubsub("I confirm that I received:" + payload );
    }
}

所以我希望当消息到达 inboundTopic 时,我会将其记录在本地控制台中,我的班级会将返回消息发送到出站主题(我将在 Google 控制台中检查)。 问题是,当我通过 Google 控制台向 inboundTopic 输入新消息时,该消息已被记录,但它会一遍又一遍地重复 - 就像它从未被确认过一样。此外,没有消息发送到 outboundTopic(我在 google 控制台中检查过)。

我不确定我做错了什么。如果有人有任何想法,我将不胜感激。

【问题讨论】:

    标签: java spring-boot google-cloud-pubsub


    【解决方案1】:

    GTService.messagingGateway 几乎肯定是null,所以当sendMessage() 被调用时你会得到一个NPE。可能存在一些隐藏错误日志的日志记录错误配置。

    原因是PubsubOutboundGateway是在Spring的常规上下文初始化期间创建的,这是在静态字段初始化之后很久。

    您需要将messagingGatewaysendMessage() 设为非静态,将GTService 注释为@Component,并将GTService 实例自动连接到TestApplication

    【讨论】: