【问题标题】:Stream "RETURNING" from insert for pg-promise从插入流“返回”用于 pg-promise
【发布时间】:2021-08-14 11:26:30
【问题描述】:

这里的目标是将RETURNING 数据从INSERT INTO .... 流式传输到写入流。

即。

                const write_stream = fs.createWriteStream('file.csv')
                client.query(`
                    INSERT INTO table1  (columns) 
                        SELECT ...
                        FROM table2
                        RETURNING *
                    `)
                    .then(returned => {
                        returned.rows.pipe(write_stream)                      
                    })

pg-promise 可以做到这一点吗?

目前来自returned.rows 的响应包含数组[{}] 中的所有数据,因此上面的示例不起作用,因此唯一的解决方案可能需要分别插入和选择,但如果我可以简单地获取返回的数据。

【问题讨论】:

  • 如果我给你的答案(5天前)对你有用,那么请接受它,否则如果有任何问题,请发表评论。 (pg-promise我是作者)

标签: node.js stream pg-promise


【解决方案1】:

不知道为什么我没有想到这一点,但解决方案只是使用pg-query-stream


let qs = new QueryStream(`
       INSERT INTO table1 
          SELECT * FROM table2
       RETURNING *;`)

const write_stream = fs.createWriteStream('file.csv');
const { parse } = require('json2csv');
const stream = client.query(qs)
let fields = [];

stream
    .on('data', data => {
         // write csv headers
         if (!fields.length) {
             fields = Object.keys(data)
             write_stream.write(fields.join(',') + '\r\n')
         }

         try {
           // write csv data
           const csv = parse(data, {header : false})
           write_stream.write(csv + '\r\n')
         } catch (err) {
           stream.close()
         }
    })
    .on('end', (end) => {
      write_stream.end()
    })
    .on('error', (error) => {
       console.error(error)
    })

【讨论】:

  • 流使用而不执行任何查询并不能真正回答您自己的问题。但是,提供的链接可以让您了解如何正确执行此操作。
  • 为什么要从链接重新发明方法?提供的代码示例是正确的方法,也更简单。
  • 您提供的示例在很多方面都很糟糕:1)它不适用于提供通用事件和处理的pg-promise流系统; 2)您手动处理JSON流格式,效率低下,可以自动完成(见我的回答) 3)解决方案通常过于复杂,因此容易出错。
【解决方案2】:

发布正确答案,因为问题作者选择不关注the example provided

以下是如何将查询结果正确地流式传输到 csv 文件中(来自pg-promise 的作者):

import QueryStream from 'pg-query-stream';
import CsvWriter from 'csv-write-stream';
import {createWriteStream} from 'fs';

const csv = new CsvWriter();
const file = createWriteStream('out.csv');

const qs = new QueryStream('select * from my_table');

await db.stream(qs, s => {
    s.pipe(csv).pipe(file);
});
//=> resolves with: {processed, duration}

参见stream 方法。

【讨论】:

    猜你喜欢
    • 2019-11-12
    • 1970-01-01
    • 1970-01-01
    • 2017-06-16
    • 2021-10-07
    • 1970-01-01
    • 2016-07-14
    • 2016-09-15
    • 2017-08-30
    相关资源
    最近更新 更多