【问题标题】:Is it able to stream data from pubsub to datastore using dataflow?是否能够使用数据流将数据从 pubsub 流式传输到数据存储区?
【发布时间】:2019-01-26 07:45:27
【问题描述】:

我尝试使用数据流将数据从 pubsub 流式传输到数据存储区。

参考: https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/src/main/java/com/google/cloud/teleport/templates

我尝试构建模板,但它根本不起作用。 所以,我认为这是不可能的。

怎么样? 请给我一些建议。

【问题讨论】:

    标签: google-cloud-platform google-cloud-datastore google-cloud-dataflow google-cloud-pubsub


    【解决方案1】:

    您可能偶然发现了该特定模板中的错误。其中有两个单独的问题,第一个是在这个 SO 问题How to use google provided template [pubsub to Datastore]? 中回答的一个,它指向缺少的errorTag,第二个是 Datastore 的编写者在将实体写入到数据存储。

    如果您使用-e 选项运行maven 编译命令,它将显示错误消息GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey。为什么这样做?这与消息是从 PubSub 流式传输而不是批处理(这是我们所期望的)这一事实有关。这意味着没有有限的项目集流入,而是永无止境的项目流。为了使用它,我们需要将其限制为可以由聚合函数(例如GroupByKey)考虑的项目的流式窗口。帮助将实体写入数据存储区的 DatastoreConverters 类实际上会检查我们是否尝试多次写入相同的键,它通过使用 GroupByKey 函数来实现。

    简单的解决方案,只需为其提供一个流式处理窗口即可,这是管道中添加的第三个.apply(...),它将流窗口化在一起,并允许您在此处使用数据存储写入器:

    import org.apache.beam.sdk.transforms.windowing.FixedWindows;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.values.TupleTag;
    import org.joda.time.Duration;
    
    ... 
    
      public static void main(String[] args) {
        PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PubsubToDatastoreOptions.class);
    
        Pipeline pipeline = Pipeline.create(options);
        TupleTag<String> errorTag = new TupleTag<String>("errors") {};
    
        pipeline
            .apply(PubsubIO.readStrings()
                .fromTopic(options.getPubsubReadTopic()))
            .apply(TransformTextViaJavascript.newBuilder()
                .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                .setFunctionName(options.getJavascriptTextTransformFunctionName())
                .build())
            .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
            .apply(WriteJsonEntities.newBuilder()
                .setProjectId(options.getDatastoreWriteProjectId())
                .setErrorTag(errorTag)
                .build());
    
        pipeline.run();
      }
    

    现在还有其他的,而且可能更好的方法可以做到这一点,但这将使您的模板编译并工作。此示例显示 1 秒的 FixedWindow,还有其他选项可以执行此操作,请查看 Google DataFlow - Windowing 的文档。

    编译你的模板:

    mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
    --project=[YOUR_PROJECTID_HERE] \
    --stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
    --tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
    --templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
    --runner=DataflowRunner"
    

    然后启动作业:

    gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
    --gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
    --zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
    --parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"
    

    现在您应该看到您的作业在 GCP 控制台中运行:

    请注意,此特定解决方案和选择的窗口将意味着 PubSub 消息最多延迟一秒才能最终到达数据存储区。缩短窗口可能会有所帮助,但为了获得更高的吞吐量,您需要与此处显示的管道不同的管道。

    【讨论】:

    • 感谢您的回答。你的建议让我得救了。我尝试了您的解决方案并取得了成功。非常感谢!!
    • 我也发现这很有用。我在 Github 中为您提到的 GroupByKey 异常打开了一个issue
    猜你喜欢
    • 2016-08-10
    • 2018-10-13
    • 1970-01-01
    • 1970-01-01
    • 2017-09-30
    • 2021-01-09
    • 2020-01-03
    • 2020-11-23
    • 1970-01-01
    相关资源
    最近更新 更多