【问题标题】:pg-promise : Cancel a query initiated with pg-query-streampg-promise : 取消使用 pg-query-stream 发起的查询
【发布时间】:2020-08-03 00:48:42
【问题描述】:

我有一个 postgresql 表,每天都有成千上万的时间序列数据。我有一个应用程序允许用户检索这些数据。查询可能会占用 200 毫秒到 30 秒的时间,具体取决于时间范围,因此必须取消这些查询以避免对生产造成无用负载。

由于有数十亿的数据,使用流来检索它们是不可避免的。

所以我设法获得了一个带有数据流的工作端点,就像它在 pg-promise 文档中显示的那样,并通过关闭 pg-query-stream 内的光标使其可以取消。

这是在此端点内完成的示例(在构建查询后调用 dataStream()):

const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");

const db = pgp({
  host: "1.2.3.4",
  port: 5432,
  database: "db",
  user: "user",
  password: "password",
  max: 2,
});

// query is an SQL string
dataStream(query, req, res, next) {
  const qs = new QueryStream(query);

  // "close" event is triggered on client request cancelation
  req.on("close", () => {
    qs.destroy();
  });

  return db.stream(qs, s => {
    s.pipe(JSONStream.stringify()).pipe(res);
    s.on("error", error => handleError(error));
  })
  .catch(error => handleError(error, query));
}

它适用于几次调用,但在某些时候(快速执行 8 到 10 次调用以检查可取消性),应用程序因此堆栈而崩溃:

\node_modules\pg-promise\node_modules\pg\lib\client.js:346
    if (self.activeQuery.name) {
                         ^

TypeError: Cannot read property 'name' of null
    at Connection.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\client.js:346:26)
    at Connection.emit (events.js:311:20)
    at Socket.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\connection.js:120:12)
    at Socket.emit (events.js:311:20)
    at addChunk (_stream_readable.js:294:12)
    at readableAddChunk (_stream_readable.js:275:11)
    at Socket.Readable.push (_stream_readable.js:209:10)
    at TCP.onStreamRead (internal/stream_base_commons.js:186:23)

所以我怀疑调用 qs.destroy() 来关闭流不是正确的方法,即使光标在服务器端被很好地销毁。

感谢 node-postgres 和 pg-promise 开发人员的工作。

【问题讨论】:

  • 抱歉编辑量太大,我将 pg-query-stream 更新到 3.0 并且不得不复习一些要点

标签: pg-promise node-postgres


【解决方案1】:

对于那些感兴趣的人,经过多次尝试,我找到了一个可行的解决方案。它还解决了我遇到的另一个问题:通过发送垃圾邮件请求来检查他们的可取消性,我注意到池中的一些客户端永远挂起并且永远不会返回,导致池满并且新请求永远挂起。

我认为这可以通过res 在流中通过管道传输并且由于请求已被取消,可读流永远不会被消耗和挂起这一事实来解释。 我的代码中的另一个问题是req.on("close", 并不总是被触发。 为了解决这个问题,我找到了一个名为 on-finished 的模块,它的作用就像想要的一样。

另外,拨打qs.destroy() 也不是正确的做法。经过长时间的调试,没有未处理错误的最一致的方法是从 pgp 的 Database.connect() 获取 Connection 对象并通过调用 connection.done() 结束查询。

所以这是我的解决方案:

const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const JSONStream = require("JSONStream");
const onFinished = require("on-finished");

const db = pgp({
  host: "1.2.3.4",
  port: 5432,
  database: "db",
  user: "user",
  password: "password",
  max: 2,
});

// query is an SQL string
async function dataStream(query, req, res, next) {
    try {
      if (query instanceof Object) {
        query = query.toString();
      }
      const connection = await db.connect();
      const qs = new QueryStream(query, [], {highWaterMark: 4000});
      const streamData = connection.client.query(qs);

      onFinished(res, () => {
        // Calling .done() to end the connection on request close.
        // Weirdly I sometimes get an error if I do not provide a callback.
        connection.done(error => {
          log.error(error);
        });
      });
      streamData.pipe(JSONStream.stringify()).pipe(res);

      streamData.on("error", error => {
        next(error);
      });
    } catch (error) {
      next(error);
    }
  }

【讨论】:

  • 我认为您的问题是在正常结束期间关闭流 - 对吗?因为在错误we destroy the stream 期间,如文档所述。正常结束后我们不会做任何stream.cursor.close()。我不知道这是否有问题。自从pg-promise 集成它以来,底层库确实发生了几次变化。所以如果有问题,欢迎pg-promise 的 PR-s!
  • 实际上,我认为错误与 writableStream res 被通过管道传输到查询流的事实有关,有时,在节点 pg 的 lib 中的某个位置,流从未被告知 res 不再存在,导致连接无限期挂起。此外,我更新了我的帖子,因为我发现了一个更好的方法来使用 connection.done()。这一次,如果它位于等待池中,连接也会被杀死。
猜你喜欢
  • 2021-03-29
  • 2019-07-16
  • 2020-10-21
  • 2016-04-03
  • 2018-02-19
  • 2017-07-22
  • 2020-01-17
  • 2017-10-02
相关资源
最近更新 更多