【发布时间】:2017-11-15 09:13:18
【问题描述】:
我刚开始使用 Dataflow / Apache Beam。 我用 Python 编写了一个 Dataflow 管道,用于将大量产品(> 50K 产品,存储在 JSON 文件中)导入 Datastore。 管道在我的本地计算机 (DirectRunner) 上运行良好,但在 DataflowRunner 上失败并显示以下错误消息:
RPCError: datastore call commit [while running 'write to datastore/Write Mutation to Datastore'] failed: Error code: INVALID_ARGUMENT. Message: datastore transaction or write too big.
我的猜测是 Datastore 无法处理管道的写入速率,但我不确定如何降低 Dataflow 管道中的写入。
我正在使用 WriteToDatastore 转换来写入 DataStore:
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
我的管道如下所示:
with beam.Pipeline(options=pipeline_options) as p:
(p # pylint: disable=expression-not-assigned
| 'read from json' >> ReadFromText(known_args.input, coder=JsonCoder())
| 'create entity' >> beam.Map(
EntityWrapper(known_args.namespace, known_args.kind,
known_args.ancestor).make_entity)
| 'write to datastore' >> WriteToDatastore(known_args.dataset))
提前感谢您的帮助。
【问题讨论】:
标签: python google-cloud-datastore google-cloud-dataflow apache-beam