【问题标题】:Issues with multiple messages being published to same pub sub topic将多条消息发布到同一个 pub 子主题的问题
【发布时间】:2021-10-09 15:31:24
【问题描述】:

我们正在尝试将消息发布到 google pub 子主题,我正在使用来自 this git repository 的示例代码。

这里的问题是,只要从下面的代码发布一条消息,发布到主题的重复消息的数量就会呈指数级增长。 不知道为什么我会面临这种行为,但无法弄清楚示例代码或已创建的发布子主题是否存在问题。 有人可以帮助我了解这里发生了什么以及如何解决此问题。

public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publisherExample(projectId, topicId);
      }

       public static void publisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "{\r\n" + 
                    "   \"errorCodeFormat\": \"NF-123-ABC000\"\r\n" + 
            "}";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
      }
    }
     }

下面是正在使用的订阅者代码

public static void subscribeAsyncExample(String projectId, String subscriptionId) throws TimeoutException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);
    
    

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

        System.out.println("You are in consumer listener");

        Subscriber subscriber = null;
       //        try {
          subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
          // Start the subscriber.
          subscriber.startAsync().awaitRunning();
          System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
          // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
          subscriber.awaitTerminated(30, TimeUnit.MINUTES);
      //        } catch (TimeoutException timeoutException) {
    //          // Shut down the subscriber after 30s. Stop receiving messages.
    //          subscriber.stopAsync();
    //        System.out.println("Subscriber state: {}"+ subscriber.state());

//  }
  }

【问题讨论】:

  • 您如何确定“发布到该主题的重复消息数量呈指数增长”?是您的订阅者多次收到消息,还是您使用该主题的指标?如果是前者,你能分享你的订阅者代码吗?
  • @Kamal Aboul-Hosn 当我对使用上述代码发布的每 1 条消息说指数时,我看到主题上随机出现 5 到 7 条重复消息。我将更新上面的订阅者代码供您参考
  • 重复的消息是不同的消息ID还是相同的ID?
  • @KamalAboul-Hosn 重复消息的消息 ID 不同,但有效负载数据保持不变。我发现的一种行为是发布的消息 ID 始终与重复消息的最后一条匹配

标签: java google-cloud-platform google-cloud-pubsub


【解决方案1】:

鉴于这些消息具有不同的消息 ID,这表明重复发生在发布端。这可能有三个原因:

  1. 正在向publish 拨打额外的意外电话。
  2. 在应用程序级别重试。
  3. 在 Pub/Sub 客户端库中重试。

您显示的代码并没有真正表明前两件事中的任何一个正在发生,但如果您的代码实际上更复杂,例如,在循环中调用发布,那么值得检查以确保两者都不发生这两个是这样的。

对于最后一个,Pub/Sub 客户端库在内部重试由于可重试原因而失败的发布。最典型的原因之一是DEADLINE_EXCEEDED 错误,当客户端没有足够快地从服务器接收到响应时会发生这种情况。这可能会导致重复,因为初始请求和重试请求最终都可能成功,您只能从第二个请求中获取消息 ID。

DEADLINE_EXCEEDED 错误可能由于许多不同的原因而发生。可能是您的 Internet 连接速度较慢,导致无法传输消息并足够快地接收响应。缓慢可能不是连接本身;如果您在通过网络执行许多其他操作的机器上运行,则可能是连接已饱和,因此无法及时处理请求和响应。如果您通过代理运行,那也可能有所帮助。

也可能是机器在 RAM 或 CPU 方面过载。如果有大量的分页需要发生或者 CPU 被充分利用,那么需要客户端库处理的回调可能无法及时处理,导致DEADLINE_EXCEEDED 错误和消息的重试。

由于消息最终成功,您可以通过changing the parameters to how the requests are retried 解决此问题以增加初始超时:

import com.google.api.gax.retrying.RetrySettings;
import org.threeten.bp.Duration;
...

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      // Retry settings control how the publisher handles retry-able failures
      Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
      double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
      Duration maxRetryDelay = Duration.ofSeconds(60);
      // This one is the important one to set.
      Duration initialRpcTimeout = Duration.ofSeconds(60);
      double rpcTimeoutMultiplier = 1.0;
      Duration maxRpcTimeout = Duration.ofSeconds(600);seconds
      Duration totalTimeout = Duration.ofSeconds(600);

      RetrySettings retrySettings =
          RetrySettings.newBuilder()
              .setInitialRetryDelay(initialRetryDelay)
              .setRetryDelayMultiplier(retryDelayMultiplier)
              .setMaxRetryDelay(maxRetryDelay)
              .setInitialRpcTimeout(initialRpcTimeout)
              .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
              .setMaxRpcTimeout(maxRpcTimeout)
              .setTotalTimeout(totalTimeout)
              .build();
      publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

使用上述设置,初始 RPC 将在 60 秒内完成。这应该有望减少重复。如果您现在只是在试验,您可能需要在生产环境中调整这些设置,因为可能不需要这么长的超时时间。

【讨论】:

    猜你喜欢
    • 2022-08-22
    • 2020-10-11
    • 1970-01-01
    • 1970-01-01
    • 2014-08-29
    • 2017-12-13
    • 2020-11-28
    • 1970-01-01
    • 2020-05-09
    相关资源
    最近更新 更多