【问题标题】:Publishing more than actual messages in Google Pub/Sub using Node.js and csv-parse使用 Node.js 和 csv-parse 在 Google Pub/Sub 中发布多于实际的消息
【发布时间】:2020-06-03 10:51:50
【问题描述】:

使用 Node.js、Google Pub/Sub、csv-parse。

用例 - 我有一个大的 csv 文件要在我的数据库中处理和导入。它很少有第三方 API 需要 1 秒来处理每一行。所以流程如下-

  1. 用户上传文件
  2. 节点服务器上传存储中的文件并向PubSubNo.1发送消息
  3. 现在,我的侦听器侦听上述 pubsub 并开始处理这些消息,它下载文件并开始中断每一行并发布到另一个 PubSub 以供进一步处理
  4. 最后我并行处理这些较小的行消息并实现更快的处理。

问题 - 一旦我的听众下载了文件,它就会发送 x no。到下一个 PubSubNo2 的行消息,但是当我检查它的订阅时,它显示超过 x 条消息。 例如我上传了 6000 条记录 csv,在订阅者上显示超过 40K-50K 条消息。

包.json

"dependencies": {
    "@google-cloud/pubsub": "1.5.0",
    "axios": "^0.19.2",
    "csv-parse": "^4.8.5",
    "dotenv": "^8.2.0",
    "google-gax": "1.14.1",
    "googleapis": "47.0.0",
    "moment": "^2.24.0",
    "path": "^0.12.7",
    "pg": "^7.18.1",
    "winston": "^3.0.0"
  }

发布者代码

async processFile(filename) {
    let cnt = 0;
    let index = null;
    let rowCounter = 0;
    const handler = (resolve, reject) => {
      const parser = CsvParser({
          delimiter: ',',
        })
        .on('readable', () => {
          let row;
          let hello = 0;
          let busy = false;
          this.meta.totalRows = (parser.info.records - 1);
          while (row = parser.read()) {
            if (cnt++ === 0) {
              index = row;
              continue;
            }
            let messageObject = {
              customFieldsMap: this.customFieldsMap,
              importAttributes: this.jc.attrs,
              importColumnData: row,
              rowCount: cnt,
              importColumnList: index,
              authToken: this.token
            }
            let topicPublishResult = PubSubPublish.publishToTopic(process.env.IMPORT_CSV_ROW_PUBLISHING_TOPIC, messageObject);
            topicPublishResult.then((response) => {
              rowCounter += 1;
              const messageInfo = "Row " + rowCounter + " published" +
                " | MessageId = " + response +
                " | importId = " + this.data.importId +
                " | fileId = " + this.data.fileId +
                " | orgId = " + this.data.orgId;
              console.info(messageInfo);
            })
          }
        })
        .on('end', () => {
          console.log("File consumed!");
          resolve(this.setStatus("queued"))
        })
        .on('error', reject);
      fs.createReadStream(filename).pipe(parser);
    };
    await new Promise(handler);
  }

并发布模块代码

const {
  PubSub
} = require('@google-cloud/pubsub');

const pubsub = new PubSub({
  projectId: process.env.PROJECT_ID
});
module.exports = {
  publishToTopic: function(topicName, data) {
    return pubsub.topic(topicName, {
      batching: {
        maxMessages: 500,
        maxMilliseconds: 5000,
      }
    }).publish(Buffer.from(JSON.stringify(data)));
  },
};

这对文件 os 10、100,200,2000 记录没有任何问题,但在处理 6K 记录时会出现更多问题。 在我发布 6K 记录后,所有 6K 记录都会出现 UnhandledPromiseRejection 错误,例如

(node:49994) UnhandledPromiseRejectionWarning: Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:65:31)
    at Timeout._onTimeout (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:100:25)
    at listOnTimeout (internal/timers.js:531:17)
    at processTimers (internal/timers.js:475:7)
(node:49994) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 6000)

感谢任何帮助!

【问题讨论】:

    标签: node.js google-cloud-pubsub node-csv-parse


    【解决方案1】:

    当您要发布 6,000 条消息时,您的发布者可能会不知所措。原因是您为在 publishToTopic 方法中创建的每条消息创建了一个新的发布者实例。因此,您无法利用任何批处理,并且您正在等待 5 秒钟来发送每条消息。每条消息的开销都很大。这可能意味着回调没有得到及时处理,导致超时和尝试重新发送。您希望一次性创建 pubsub.topic 对象,然后在发布调用中重复使用它。

    【讨论】:

    • 太棒了。为我工作,复制代码的另一个问题。我使用相同的代码发布到另一个 Pub/sub,但它的时间非常稀疏,因此没有问题。我尝试了 20K 记录,它非常流畅。我仍然不明白它工作的技术性。你能详细说明一下吗? PS:我试过没有maxMilliseconds选项。
    • 当您说您不了解它的工作原理时,您究竟是什么意思?为什么每次创建主题时重用主题都不起作用?为什么创建单独的实例会阻止批处理工作?
    • 您提到的技术对于发布者来说已经不堪重负,所以我的错误代码制作了 6000 个不同的发布者,并且一次触发所有 6K 个发布者,然后超时以等待 GCP 的响应。所以是的,为什么重用有效?以及为什么即使单独触发 GCP 也无法响应 6000 个新主题对象。
    • 主题对象相当重量级。创建 6,000 会导致在您的客户端中运行的回调积压。这意味着无法快速处理来自服务的结果,因此客户端认为它们已超时重试发布,这在您的客户端中建立了更多的工作。只有一个主题对象的副本,消息被批处理并且对服务器的请求更少,这意味着您的客户端要做的工作更少。问题不在于 GCP,它每秒可以处理数百万条消息。如果您使用 6,000 个主题实例,您的客户端将无法处理必须运行的所有异步工作。