【发布时间】:2020-05-22 17:46:49
【问题描述】:
我正在尝试构建一个 Dataflow 管道,该管道在将 JSON 文件上传到 Google Cloud Storage 并将其写入 Cloud Datastore 时触发。
根据Dataflow templatejson 文件的每一行必须有Datastore 数据对象格式,定义here。
这就是我试图适应 Datastore 数据对象的 json 文件的样子:
{
"userId": "u-skjbdw34jh3gx",
"rowRanks:": [
{
"originalTrigger": "recent",
"programmedRowPos": "VR1",
"reoderedRowPos": 0
},
{
"originalTrigger": "discovery",
"programmedRowPos": "VR1",
"reoderedRowPos": 1
}
]
}
以下是我尝试使其适应上述链接数据对象的程度。
{
"key": {
"partitionId": {
"projectId": "gcp-project-id",
"namespaceId": "spring-demo"
},
"path":
{
"kind": "demo",
"name": "userId"
}
},
"properties": {
"userId": {
"stringValue": "01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"
}
}
}
以下是我在尝试写入 Datastore 时在 Dataflow 中遇到的错误:
com.google.protobuf.InvalidProtocolBufferException: java.io.EOFException: End of input at line 1 column 2 path $.
at com.google.protobuf.util.JsonFormat$ParserImpl.merge(JsonFormat.java:1195)
at com.google.protobuf.util.JsonFormat$Parser.merge(JsonFormat.java:370)
at com.google.cloud.teleport.templates.common.DatastoreConverters$EntityJsonParser.merge(DatastoreConverters.java:497)
at com.google.cloud.teleport.templates.common.DatastoreConverters$JsonToEntity.processElement(DatastoreConverters.java:351)
【问题讨论】:
-
你的文件有多大?您选择 Beam/dataflow 进行转换的原因是什么?
-
目前大约 15GB,但由于它是针对整个用户群的 ML 模型输出,它会快速增长。
-
你能提供一个CSV文件的例子吗?我想了解所需的转变;数据存储实体在 csv 中的外观如何,以及应如何将其插入到数据存储中。
-
编辑问题以回答您的查询,第一个代码 sn -p 共享,是 json 文件中的对象。文件中的每一行都是一个这样的对象。
标签: google-cloud-platform google-cloud-storage google-cloud-datastore google-cloud-dataflow