【问题标题】:Using existing pub sub subscription from google data flow使用来自 google 数据流的现有 pub sub 订阅
【发布时间】:2017-06-09 12:48:38
【问题描述】:

我正在使用 Google 数据流,其中一个步骤是使用已创建的订阅订阅 pub sub 中的主题。 这是代码sn-p

CustomPipelineOptions options =
            PipelineOptionsFactory.fromArgs(args).withValidation().as(customPipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device  data from PubSub")                 .subscription("projects/<projectID>/subscriptions/<subscriptionname>)
            .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
            .timestampLabel("ts")
            .withCoder(TableRowJsonCoder.of()));

上面的代码在执行时会出现以下错误: 错误处理管道。原因:(b5e276ef8c76419f):步骤 s1 的输入 pubsub_subscription 无法识别。

我传递了正确的订阅名称和项目 ID。 不知道为什么仍然出现上述错误。

请帮忙。

【问题讨论】:

    标签: google-cloud-dataflow google-cloud-pubsub


    【解决方案1】:

    指定 2 个来源之一就足够了:主题或订阅。

    我建议你试试:

    PCollection<TableRow> datastream = p
            .apply(PubsubIO.Read.named("Read device data from PubSub")
            .topic(String.format("projects/%s/topics/%s", options.getSourceProject(), options.getSourceTopic()))
            .timestampLabel("ts")
            .withCoder(TableRowJsonCoder.of()));
    

    另外:我想您使用的是 Dataflow 1.9 SDK?您可能想考虑转移到new Beam 2.0.0 release。您可以在该 SDK here 中找到 PubSub 的参考。

    【讨论】:

    • 嗨,Matthias,感谢您的回复。让我试试,然后回复您。
    猜你喜欢
    • 2020-02-01
    • 1970-01-01
    • 2019-10-03
    • 2019-08-15
    • 1970-01-01
    • 2018-11-13
    • 1970-01-01
    • 2021-12-12
    相关资源
    最近更新 更多