【发布时间】:2022-06-29 05:12:47
【问题描述】:
我有一个包含数百万数据的 MySQL 表。 对于每一行,我必须应用自定义逻辑并在另一个表上更新修改后的数据。
使用 knex.js 我运行查询以使用 stream() 函数读取数据
一旦我得到 Stream 对象,我就会将我的逻辑应用于数据事件。 一切正常,但在某个时候它会停止而不会出现任何错误。
我尝试在新表中的每个更新操作之前暂停流,并在完成更新后重新启动它,但问题没有解决。 尝试限制查询,例如 1000 个结果,系统工作正常。
示例代码:
const readableStream = knex.select('*')
.from('big_table')
.stream();
readableStream.on('data', async(data) => {
readableStream.pause() // pause stream
const toUpdate = applyLogic(data) // sync func
const whereCond = getWhereCondition(data) // sync func
try {
await knex('to_update').where(whereCond).update(toUpdate)
console.log('UPDATED')
readableStream.resume() // resume stream
} catch (e) {
console.log('ERROR', e)
}
readableStream.resume() // resume stream
}).on('finish', () => {
console.log('FINISH')
}).on('error', (err) => {
console.log('ERROR', err)
})
谢谢!
【问题讨论】:
标签: mysql node.js kubernetes knex.js