【发布时间】:2018-02-16 21:59:26
【问题描述】:
我在做什么
我正在尝试编写一个转换流,该流将用作从 HTTP API (Salesforce) 读取数据的过程的一部分,将每个资源转换为标准的内部数据结构,然后将转换后的数据发送到由 Elasticsearch 索引。
我的问题
我写了一个简单的 Transform 流,但它似乎在吞噬数据,而不是通过它。
这是我的课程AdapterStream
const { Transform } = require('stream')
class AdapterStream extends Transform {
constructor (adapter) {
super({objectMode: true})
// adapter is a function that contains the transform logic
this.adapter = adapter
}
_transform (chunk, _encoding, callback) {
this.push(this.adapter(chunk))
callback()
}
}
module.exports = AdapterStream
这是我的使用方法
// query is an EventEmitter (https://jsforce.github.io/document/#query)
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
.pipe(process.stdout)
我预计会发生什么
我希望在终端窗口中看到转换后的数据。
实际情况
我的终端窗口中没有打印任何内容
我尝试过的事情
写入文件
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
.pipe(fs.createWriteStream('./test.json'))
这会按预期创建文件./test.json,但它是空的。
订阅活动
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
transform.on('readable', () => { console.log(transform.read()) })
这符合我对.pipe(process.stdout) 的预期:将转换后的记录打印到控制台。
我的问题是,为什么 pipe() 不做我期望它做的事情?我一定缺少一些简单的东西。
【问题讨论】:
-
节点文档指出可能是我的问题的原因:
Note: Care must be taken when using Transform streams in that data written to the stream can cause the Writable side of the stream to become paused if the output on the Readable side is not consumed.stream_implementing_a_transform_stream。我不确定这对我到底意味着什么 -
您介意提供一个我可以实际运行以重现问题的查询的最小示例吗?