【问题标题】:Google Spanner to PubSub through DataflowGoogle Spanner 通过 Dataflow 到 PubSub
【发布时间】:2020-04-05 00:40:12
【问题描述】:

使用 Google Dataflow 我需要从 Google spanner 读取数据并作为批处理写入 PubSub,我在 Spanner 中有超过 100000 条记录,所以我需要读取这些记录并使用 pubsub 批处理发布到 PubSub 主题中,1000 条记录将是每次发布迭代的限制。

请帮帮我

【问题讨论】:

  • 你的错误是什么?你目前的破解密码是什么?你能分享更多细节吗?
  • 是的,我们可以使用 pubsub io 编写,但我不想一次发布所有记录,因为我可能有大量数据,所以我需要拆分数据并作为多个发布请求发送,我没有找到这方面的文档,有什么办法可以做到这一点。
  • 我了解到您不想在一条 PubSub 消息中发布 100k+ 行。我对吗?如果是这样,您要逐行发布到 PubSub 中吗?还是逐块(每块大约 1000 行)?
  • 是的,你是对的,我想按 1000 行分块
  • 您的查询输出中有行号吗?可以加一个吗?

标签: google-cloud-platform google-cloud-functions google-cloud-dataflow google-cloud-pubsub google-cloud-spanner


【解决方案1】:

一种方法是使用使用数据流连接器

从 Cloud Spanner 读取数据

要从 Cloud Spanner 中读取数据,请应用 SpannerIO.read() 转换。使用 SpannerIO.Read 类中的方法配置读取。应用转换会返回一个 PCollection,其中集合中的每个元素代表读取操作返回的单个行。您可以根据所需的输出从 Cloud Spanner 中读取是否使用特定 SQL 查询。

应用 SpannerIO.read() 转换通过执行强读取返回一致的数据视图。除非您另外指定,否则读取的结果会在您开始读取时生成快照。有关 Cloud Spanner 可以执行的不同类型读取的更多信息,请参阅读取。

见:https://cloud.google.com/spanner/docs/dataflow-connector

这个帖子似乎解释了如何从 DataFlow 写入 PubSub:Publish messages to Pubsub topic in Dataflow

【讨论】:

  • 我认为@wild 正在请求一种方法来了解如何开始使用此方法,因此是高级答案。很想了解您为什么认为它不正确。 :)
  • 说的没错,但是问题太笼统了,你的答案也是。这不是 stackoverflow 的目标。
【解决方案2】:

我想了解确切的用例以及您要通过此实现的目标。

您可以使用以下内容从 Spanner 读取数据并在 pub sub 发布级别进行批处理。它在发布时对 pubsub 消息进行批处理。 (这里有 1 行作为一个 pubsub 消息发布)

流水线过程

  CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(pipelineArgs).as(CustomPipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);

SpannerConfig spannerConfig = SpannerConfig.create()
    .withDatabaseId(options.getSpannerDatabaseId())
    .withProjectId(options.getSpannerProjectId())
    .withInstanceId(options.getSpannerInstanceId());

pipeline.apply(SpannerIO.read()
    .withTable("TestTable")
    .withSpannerConfig(spannerConfig)
    .withColumns(Arrays.asList("TestColumn")))
    .apply( ParDo.of(new StructToPubSubConverter()))
    .apply(PubsubIO.writeMessages()
        .to(options.getPubsubWriteTopic())
        .withMaxBatchSize(1000)); // Batch Size


pipeline.run();

Spanner 到 PubSub 转换器

public static class StructToPubSubConverter extends DoFn<Struct, PubsubMessage> {

@ProcessElement
public void processElement(ProcessContext context, OutputReceiver<PubsubMessage> out){
  Struct struct =context.element();
  String testColumn = struct.getString(0);
  context.output(new PubsubMessage(testColumn.getBytes(),new HashMap<>()));
}

}

不确定,这是否解决了您的问题,但应该提供一个公平的想法。分享更多细节会很有帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-12-14
    • 2016-11-26
    • 2020-02-05
    • 1970-01-01
    • 1970-01-01
    • 2021-11-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多