【问题标题】:Google Cloud Pubsub Data lostGoogle Cloud Pubsub 数据丢失
【发布时间】:2017-05-29 21:13:02
【问题描述】:

我在使用 GCP pubsub 时遇到问题,在几秒钟内发布数千条消息时,有一小部分数据丢失。

我正在记录来自 pubsub 的 message_id 和一个 session_id 对发布端和接收端的每条消息都是唯一的,我看到的结果是接收端的一些消息有相同的session_id,但不同的message_id。此外,有些消息丢失了。

例如,在一项测试中,我向 pubsub 发送了 5,000 条消息,恰好收到了 5,000 条消息,其中 8 条消息丢失了。日志丢失消息如下所示:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)

messageId FOUND: messageId:108562396466545

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)

重复的看起来像:

======= Duplicates FOUND on sessionId: 730=======

sessionId: 730, messageId:108562396466545

sessionId: 730, messageId:108561339282318

(both are logs from pull request)

所有缺失的数据和重复的数据都像这样。

从上面的例子可以看出,有些消息已经取了另一个消息的message_id,并且已经用两个不同的message_ids发送了两次。

我想知道是否有人会帮我弄清楚发生了什么?提前致谢。

代码

我有一个 API 向 pubsub 发送消息,如下所示:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)
ps = pubsub.Client()

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

这是我以前从 pubsub 读取的代码:

从 google.cloud 导入 pubsub 重新进口 导入json

ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')

max_messages = 1
stop = False

messages = []

class Message(object):
    """docstring for Message."""
    def __init__(self, sessionId, messageId):
        super(Message, self).__init__()
        self.seesionId = sessionId
        self.messageId = messageId


def pull_all():
    while stop == False:

        m = sub.pull(max_messages = max_messages, return_immediately = False)

        for data in m:
            ack_id = data[0]
            message = data[1]
            messageId = message.message_id
            data = message.data
            event = json.loads(data)
            sessionId = str(event["sessionId"])
            messages.append(Message(sessionId = sessionId, messageId = messageId))

            print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"

            sub.acknowledge(ack_ids = [ack_id])

pull_all()

用于生成 session_id,从 API 发送请求和记录响应:

// generate trackable sessionId
var sessionId = 0

var increment_session_id = function () {
  sessionId++;
  return sessionId;
}

var generate_data = function () {
  var data = {};
  // data.sessionId = faker.random.uuid();
  data.sessionId = increment_session_id();
  data.user = get_rand(userList);
  data.device = get_rand(deviceList);
  data.visitTime = new Date;
  data.location = get_rand(locationList);
  data.content = get_rand(contentList);

  return data;
}

var sendData = function (url, payload) {
  var request = $.ajax({
    url: url,
    contentType: 'application/json',
    method: 'POST',
    data: JSON.stringify(payload),
    error: function (xhr, status, errorThrown) {
      console.log(xhr, status, errorThrown);
      $('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
      $('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
      $('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
    }
  }).done(function (xhr) {
    console.log(xhr);
    $('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
  })
}

$(submit_button).click(function () {
  var request_num = get_request_num();
  var request_url = get_url();
  for (var i = 0; i < request_num; i++) {
    var data = generate_data();
    var loadData = changeVerb(data, 'load');
    sendData(request_url, loadData);
  }
}) 

更新

我对 API 进行了更改,问题似乎消失了。我所做的更改不是为所有请求使用一个 pubsub.Client(),而是为每个传入的请求初始化一个客户端。新的 API 如下所示:

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json


app = Flask(__name__)

...

@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():

    ps = pubsub.Client()


    pubsub_topic = 'test_topic'
    data = request.data

    topic = ps.topic(pubsub_topic)

    event = json.loads(data)

    messageId = topic.publish(data)
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"

【问题讨论】:

  • 根据您的代码,您将每条消息发布两次。这只是一个错字还是您确实多次发布每条消息?
  • 很好,这是一个错字。每个请求只发送一次。现在已经修复了
  • 这是另一个错字吗? self.seesionId = sessionId。应该是 self.sessionId 吗?
  • 这是代码中的错字。感谢您指出。虽然它不会影响任何事情,因为我没有在代码中的任何地方使用Messages 数组。

标签: google-cloud-messaging google-cloud-platform google-cloud-pubsub


【解决方案1】:

与 Google 的某个人交谈,这似乎是 Python 客户端的问题:

我们这边的共识是当前的python客户端存在线程安全问题。正如我们所说,客户端库几乎正在从头开始重写,所以我不想在当前版本中进行任何修复。我们预计新版本将在 6 月底推出。

在 app.yaml 中使用 thread_safe: false 运行当前代码或更好,但在每次调用中仅实例化客户端应该是解决方法——您找到的解决方案。

详细解决方法请看问题中的更新

【讨论】:

    【解决方案2】:

    Google Cloud Pub/Sub 消息 ID 是唯一的。 “某些消息 [to] 采取了另一条消息的message_id”应该是不可能的。消息 ID 108562396466545 似乎已收到这一事实意味着 Pub/Sub 确实将消息传递给了订阅者并且没有丢失。

    我建议您检查您的session_ids 是如何生成的,以确保它们确实是唯一的,并且每条消息都有一个。通过正则表达式搜索在 JSON 中搜索 sessionId 似乎有点奇怪。您最好将此 JSON 解析为实际对象并以这种方式访问​​字段。

    一般来说,Cloud Pub/Sub 中的重复消息总是可能的;该系统保证至少一次交付。如果重复发生在订阅端(例如,未及时处理 ack)或使用不同的消息 ID(例如,如果在出现类似错误后重试消息的发布,则这些消息可以使用相同的消息 ID 传递已超过最后期限)。

    【讨论】:

    • 感谢您的回复。 session_id 是使用递增数字生成器为每条消息唯一生成的,我检查了 App Engine 的日志并确认发送的每个 session_id 都是唯一的。我也使用 JSON 解析器进行了实验,但我仍在观察相同的行为。我知道 pubsub 中的重复消息,我根本不担心它们。让我担心的是丢失的消息以及重复项正在占用message_ids 的丢失消息这一事实。
    • 假设 self.seesionId = sessionId 的错字不负责任,我建议您查看变量的范围。 messageId 和 sessionId 是您的发布端还是订阅端的全局变量?如果是这样,它们可能会被对 pull_all 或 publish_test_topic 的并发调用覆盖。似乎消息是一个全局变量,因此任何 pull_all 的并发调用都可能导致竞争条件。
    • messageId 和 sessionId 都不是全局的。 messages 是全局的,但不在任何地方使用。订阅端不在多线程上,所以那里不应该有任何竞争条件。我能看到的唯一可能的竞争条件是在 API 端,因为 App Engine 可能有多个实例来处理流量,因此可能会同时向 pubsub 发送多个发布请求。
    • 但是由于messageId = topic.publish(data)返回一个邮件独有的message_id,我不确定是否会有问题
    • 我对 API 进行了更改,问题似乎消失了。我所做的更改不是对所有请求使用一个 pubsub.Client(),而是为每个传入的请求初始化一个客户端。我不确定为什么会发生这种情况。我必须在每个端点创建一个新客户端吗?
    【解决方案3】:

    您不需要为每个发布操作创建一个新客户端。我敢打赌,“解决问题”的原因是因为它减轻了发布者客户端存在的竞争。我也不相信您在发布者方面显示的日志行:

    API:200 **** sessionId:731,messageId:108562396466545 ******

    对应于通过 publish_test_topic() 成功发布 sessionId 731。该日志行是在什么条件下打印的?到目前为止提供的代码没有显示这一点。

    【讨论】:

    • publish_test_topic()方法返回return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******",作为对/publish的每个请求的响应返回到应用程序的前端,并在控制台打印出来收到后立即
    猜你喜欢
    • 2019-03-25
    • 1970-01-01
    • 2018-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-31
    • 2019-07-02
    • 2021-06-05
    相关资源
    最近更新 更多