【发布时间】:2022-01-22 15:11:58
【问题描述】:
我正在使用以下管道从 Aurora 流式传输数据,将其转换为 csv,然后将其发送到 S3。
可读的 knex 流:
const getQueryStream = (organizationId) =>
db.select('*')
.from('users')
.where('organization_id', organizationId)
.stream();
转换数据:
const toCSVTransform = (fields) => new stream.Transform({
objectMode: true,
transform: (row, encoding, callback) => {
let rowAsArr = [];
for(let i = 0; i < fields.length; i++) {
rowAsArr.push(row[fields[i]]);
}
callback(null, `${rowAsArr.join(',')}\n`);
}
});
管道:
stream.pipeline(
dbStream,
toCSVTransform(['first_name', 'last_name', 'email']),
s3WritableStream,
(err) => {
if (err) {
console.error('Pipeline failed.', err)
} else {
console.log('Pipeline succeeded.')
}
}
)
这可以按原样工作,但我们有一个额外的要求,即使用 PGP 加密来加密文件。我的想法是在toCSVTransform 之后在管道中增加一个步骤来进行加密。 npm 包openpgp 支持流,但我不知道如何将它用于管道。
来自openpgp 文档,这里是如何使用将可读流传递给openpgp.encrypt 函数的示例:
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('Hello, world!');
controller.close();
}
});
const encrypted = await openpgp.encrypt({
message: await openpgp.createMessage({ text: readableStream }), // input as Message object
encryptionKeys: publicKey,
signingKeys: privateKey // optional
});
我见过的所有示例都只是将可读流传递给 encrypt 函数。但我需要在将数据发送到 s3 之前将数据转换为数据。
有没有办法让我将toCSVTransform 流传递给openpgp.encrypt 方法?
似乎我想将可读的dbStream 和转换流toCSVTransform 组合成一个流并将其传递给 openpgp.encrypt 函数。
我注意到 node.js 有一个 stream.compose 方法,但它目前只是实验性的,所以它不是一个真正的选择。
**** 编辑:可能的解决方案 看起来我可以使用 pipe() 在将流传递给 openpgp.encrypt 方法之前对其进行转换:
const encrypted = await openpgp.encrypt({
message: await openpgp.createMessage({ text: dbStream.pipe(toCSVTransform) }), // input as Message object
encryptionKeys: publicKey,
signingKeys: privateKey // optional
});
【问题讨论】:
标签: node.js encryption node.js-stream openpgp.js