【问题标题】:Need help to build Cloud Datastore data object需要帮助来构建 Cloud Datastore 数据对象
【发布时间】:2020-05-22 17:46:49
【问题描述】:

我正在尝试构建一个 Dataflow 管道,该管道在将 JSON 文件上传到 Google Cloud Storage 并将其写入 Cloud Datastore 时触发。

根据Dataflow templatejson 文件的每一行必须有Datastore 数据对象格式,定义here

这就是我试图适应 Datastore 数据对象的 json 文件的样子:

{
  "userId": "u-skjbdw34jh3gx",
  "rowRanks:": [
    {
      "originalTrigger": "recent",
      "programmedRowPos": "VR1",
      "reoderedRowPos": 0
    },
    {
      "originalTrigger": "discovery",
      "programmedRowPos": "VR1",
      "reoderedRowPos": 1
    }
  ]
}

以下是我尝试使其适应上述链接数据对象的程度。

{
  "key": {
    "partitionId": {
      "projectId": "gcp-project-id",
      "namespaceId": "spring-demo"
    },
    "path": 
      {
        "kind": "demo",
        "name": "userId"
      }
  },
  "properties": {
    "userId": {
      "stringValue": "01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"
    }
  }
}

以下是我在尝试写入 Datastore 时在 Dataflow 中遇到的错误:

com.google.protobuf.InvalidProtocolBufferException: java.io.EOFException: End of input at line 1 column 2 path $.
    at com.google.protobuf.util.JsonFormat$ParserImpl.merge(JsonFormat.java:1195)
    at com.google.protobuf.util.JsonFormat$Parser.merge(JsonFormat.java:370)
    at com.google.cloud.teleport.templates.common.DatastoreConverters$EntityJsonParser.merge(DatastoreConverters.java:497)
    at com.google.cloud.teleport.templates.common.DatastoreConverters$JsonToEntity.processElement(DatastoreConverters.java:351)

【问题讨论】:

  • 你的文件有多大?您选择 Beam/dataflow 进行转换的原因是什么?
  • 目前大约 15GB,但由于它是针对整个用户群的 ML 模型输出,它会快速增长。
  • 你能提供一个CSV文件的例子吗?我想了解所需的转变;数据存储实体在 csv 中的外观如何,以及应如何将其插入到数据存储中。
  • 编辑问题以回答您的查询,第一个代码 sn -p 共享,是 json 文件中的对象。文件中的每一行都是一个这样的对象。

标签: google-cloud-platform google-cloud-storage google-cloud-datastore google-cloud-dataflow


【解决方案1】:

json 文件应该在一行中包含 google 云数据存储对象。因此,有问题引用的错误:End of input at line 1 column 2 path $.

应该是这样的:

{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}

显然,json 文件将包含数千个对象,但每个对象都必须在一行中:

{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}

【讨论】:

    【解决方案2】:

    如果我正确理解了您的输入数据格式和所需的输出,则此 js 代码应该可以解决问题:

    var data = {
      "userId": "u-skjbdw34jh3gx",
      "rowRanks": [
        {
          "originalTrigger": "recent",
          "programmedRowPos": "VR1",
          "reorderedRowPos": 0
        },
        {
          "originalTrigger": "discovery",
          "programmedRowPos": "VR1",
          "reorderedRowPos": 1
        }
      ]
    }
    
    var entity = {};
    entity.key = {};
    entity.key.partitionId = {};
    entity.key.partitionId.projectId = "gcp-project-id";
    entity.key.partitionId.namespaceId = "spring-demo";
    
    var path = {}
    path.kind = "demo";
    path.name = "userId";
    entity.key.path = [];
    entity.key.path.push(path);
    
    entity.properties = {};
    entity.properties.userId = {};
    entity.properties.userId.stringValue = data.userId;
    entity.properties.rowRanks = {};
    entity.properties.rowRanks.arrayValue = {};
    
    var arrayValues = [];
    data.rowRanks.forEach(buildArrayValue);
    
    function buildArrayValue(row) {
      var temp = {};
      temp.entityValue = {};
      temp.entityValue.properties = {};
      temp.entityValue.properties.originalTrigger = {};
      temp.entityValue.properties.originalTrigger.stringValue = row.originalTrigger;
      temp.entityValue.properties.programmedRowPos = {};
      temp.entityValue.properties.programmedRowPos.stringValue = row.programmedRowPos;
      temp.entityValue.properties.reorderedRowPos = {};
      temp.entityValue.properties.reorderedRowPos.integerValue = row.reorderedRowPos;
      arrayValues.push(temp);
    }
    
    entity.properties.rowRanks.arrayValue.values = arrayValues;
    
    document.write(JSON.stringify(entity));

    由于forEach() 循环,基本上构建了rowRanks 数组。请注意,path 需要是一个数组(reference)。

    现在我们稍微修改一下,让它在模板代码而不是浏览器中运行,将文件上传到 GCS 并按照指令here 执行它:

    gcloud dataflow jobs run test-datastore \
    --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Datastore \
    --parameters=javascriptTextTransformGcsPath=gs://$BUCKET/*.js,errorWritePath=gs://$BUCKET/errors.txt,javascriptTextTransformFunctionName=transform,textReadPattern=gs://$BUCKET/*.json,datastoreWriteProjectId=$PROJECT
    

    上传到GCS的js文件完整内容为:

    function transform(elem) {
        var data = JSON.parse(elem);
    
        var entity = {};
        entity.key = {};
        entity.key.partitionId = {};
        entity.key.partitionId.projectId = "gcp-project-id";
        entity.key.partitionId.namespaceId = "spring-demo";
    
        var path = {}
        path.kind = "demo";
        path.name = "userId";
        entity.key.path = [];
        entity.key.path.push(path);
    
        entity.properties = {};
        entity.properties.userId = {};
        entity.properties.userId.stringValue = data.userId;
        entity.properties.rowRanks = {};
        entity.properties.rowRanks.arrayValue = {};
    
        var arrayValues = [];
        data.rowRanks.forEach(buildArrayValue);
    
        function buildArrayValue(row) {
          var temp = {};
          temp.entityValue = {};
          temp.entityValue.properties = {};
          temp.entityValue.properties.originalTrigger = {};
          temp.entityValue.properties.originalTrigger.stringValue = row.originalTrigger;
          temp.entityValue.properties.programmedRowPos = {};
          temp.entityValue.properties.programmedRowPos.stringValue = row.programmedRowPos;
          temp.entityValue.properties.reorderedRowPos = {};
          temp.entityValue.properties.reorderedRowPos.integerValue = row.reorderedRowPos;
          arrayValues.push(temp);
        }
    
        entity.properties.rowRanks.arrayValue.values = arrayValues;
    
        return JSON.stringify(entity);
    }
    

    作业对我来说运行成功:

    并将数据写入 Datastore:

    如果这对你有帮助,请告诉我。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-08
      • 2014-02-14
      • 1970-01-01
      • 1970-01-01
      • 2011-05-14
      • 2021-09-01
      • 1970-01-01
      相关资源
      最近更新 更多