【问题标题】:Google DataFlow Apache Beam谷歌数据流 Apache Beam
【发布时间】:2018-07-20 20:09:14
【问题描述】:

我正在尝试使用 Apache Beam 创建数据流管道,但我无法遵循文档,也找不到任何示例。

管道很简单。

  1. 创建管道
  2. 从发布/订阅主题中读取
  3. 写信给扳手。

目前,我被困在第 2 步。我找不到任何关于如何从 pub/sub 读取并使用它的示例。

这是我到目前为止想要的代码

class ExtractFlowInfoFn extends DoFn<PubsubMessage, KV<String, String>> {
    public void processElement(ProcessContext c) {
        KV.of("key", "value");
    }
}

public static void main(String[] args) {

    Pipeline p = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());

    p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
     .apply("ConvertToKeyValuePair", ParDo.of(new ExtractFlowInfoFn()))
     .apply("WriteToLog", ));
};

通过遵循多个示例,我能够想出代码。老实说,我不知道我在这里做什么。

请帮助我理解这一点或将我链接到正确的文档。

【问题讨论】:

    标签: google-cloud-dataflow apache-beam google-cloud-spanner


    【解决方案1】:

    从 Pub/Sub 中提取消息并写入 Cloud Spanner 的示例:

    import com.google.cloud.spanner.Mutation;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
    
    class MessageToMutationDoFn extends DoFn<PubsubMessage, Mutation> {
    
        @ProcessElement
        public void processElement(ProcessContext c) {
    
            // TODO: create Mutation object from PubsubMessage
    
            Mutation mutation = Mutation.newInsertBuilder("users_backup2")
                .set("column_1").to("value_1")
                .set("column_2").to("value_2")
                .set("column_3").to("value_3")
                .build();
    
            c.output(mutation);
        }
    }
    
    public static void main(String[] args) {
    
        Pipeline p = Pipeline.create();
    
        p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
         .apply("MessageToMutation", ParDo.of(new MessageToMutationDoFn()))
         .apply("WriteToSpanner", SpannerIO.write()
             .withProjectId("projectId")
             .withInstanceId("spannerInstanceId")
             .withDatabaseId("spannerDatabaseId"));
    
        p.run();
    }
    

    参考:Apache Beam SpannerIOApache Beam PubsubIO

    【讨论】:

    • 这个例子部分有效。我无法写信给扳手。它给出了一个错误,并且该错误不是很有用。但是,当我有正确的答案时,我会更新这篇文章。
    • 解决方案在答案中。 stackoverflow.com/questions/46684071/…。并使用 Beam 2.2.0 版
    猜你喜欢
    • 2021-05-16
    • 1970-01-01
    • 2015-04-14
    • 1970-01-01
    • 1970-01-01
    • 2023-01-14
    • 2019-06-04
    • 2019-09-16
    • 2018-12-27
    相关资源
    最近更新 更多