【问题标题】:PubSub with spring: know the message is publish or not?带有spring的PubSub:知​​道消息是否发布?
【发布时间】:2020-04-03 14:29:55
【问题描述】:

我的发布者代码如下所示:

public abstract class PubSubPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(PubSubPublisher.class);

    private final PubSubTemplate pubSubTemplate;

    protected PubSubPublisher(PubSubTemplate pubSubTemplate) {
        this.pubSubTemplate = pubSubTemplate;
    }

    protected abstract String topic(String topicName);

    public void publish(String topicName, String message) throws StatusRuntimeException {
        LOGGER.info("Publishing to topic [{}]. Message: [{}]", topicName, message);
        pubSubTemplate.publish(topicName, message);
    }

}

我的组件

@Component
public class HelloPubSubPublisher extends PubSubPublisher {

    @Autowired
    public HelloPubSubPublisher(PubSubTemplate pubSubTemplate) throws StatusRuntimeException{
        super(pubSubTemplate);
    }

    @Override
    protected String topic(String topicName) {
        return topicName;
    }

}

现在在我的服务层上,我如何获取天气我是否成功将消息发布到主题,请注意我正在使用的所有 google api 都是异步的。

try {
    publisher.publish(topicName, payload);
}catch (Exception e) {
    LOGGER.error("ioException occured: "+e);
    throw new TopicNotFoundException();
}

不幸的是,我无法捕获任何错误,程序光标没有进入 catch 块。

最终,我想知道代码是否会将消息推送到主题,如果不是,那么我必须记录它并将该错误抛出给客户端,这在我当前的代码中不会发生正确的异常处理。

感谢任何帮助或指导,谢谢。

【问题讨论】:

  • 您可以在谷歌云平台仪表板 PubSub > 订阅 > 点击一个订阅 > 查看消息

标签: spring-boot google-cloud-pubsub


【解决方案1】:

使用函数publish(),您应该能够捕获future,您可以在其中检查消息是否已发布。

Google's PubSub documentation 上有一个例子:

// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> future = publisher.publish(pubsubMessage);

// Add an asynchronous callback to handle success / failure
ApiFutures.addCallback(
    future,
    new ApiFutureCallback<String>() {

      @Override
      public void onFailure(Throwable throwable) {
        if (throwable instanceof ApiException) {
          ApiException apiException = ((ApiException) throwable);
          // details on the API exception
          System.out.println(apiException.getStatusCode().getCode());
          System.out.println(apiException.isRetryable());
        }
        System.out.println("Error publishing message : " + message);
      }

      @Override
      public void onSuccess(String messageId) {
        // Once published, returns server-assigned message ids (unique within the topic)
        System.out.println(messageId);
      }
    },
    MoreExecutors.directExecutor());

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-22
    • 2013-08-19
    • 1970-01-01
    • 1970-01-01
    • 2022-01-11
    相关资源
    最近更新 更多