【问题标题】:PubSub to Spanner Streaming PipelinePubSub 到 Spanner 流管道
【发布时间】:2020-02-05 09:36:11
【问题描述】:

我正在尝试将 JSON 类型的 PubSub 消息流式传输到 spanner 数据库,并且 insert_update 运行良好。 Spanner 表具有复合主键,因此需要在从 PubSub 插入新数据之前删除现有数据(因此只有最新数据存在)。在这种情况下,扳手替换或插入/更新突变不起作用。 我添加了管道


import org.apache.beam.* ;

public class PubSubToSpannerPipeline {

  // JSON to TableData Object
  public static class PubSubToTableDataFn extends DoFn<String, TableData> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      .
      .
      .
    }
  }

  public interface PubSubToSpannerOptions extends PipelineOptions, StreamingOptions {
    .
    .
    .
  }

  public static void main(String[] args) {
    PubSubToSpannerOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(PubSubToSpannerOptions.class);
    options.setStreaming(true);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
        .withProjectId(options.getProjectId())
        .withInstanceId(options.getInstanceId())
        .withDatabaseId(options.getDatabaseId());

    Pipeline pipeLine = Pipeline.create(options);

    PCollection<TableData> tableDataMsgs = pipeLine.apply(PubsubIO.readStrings()
        .fromSubscription(options.getInputSubscription()))
        .apply("ParsePubSubMessage", ParDo.of(new PubSubToTableDataFn ()));

    // Window function
    PCollection<TableData> tableDataJson = tableDataMsgs
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<MutationGroup> upsertMutationGroup = tableDataJson.apply("TableDataMutation",
        MapElements.via(new SimpleFunction<TableData, MutationGroup>() {

          public MutationGroup apply(TableData input) {

            String object_id = input.objectId;

            pipeLine.apply("ReadExistingData", SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withQuery("SELECT object_id, mapped_object_id, mapped_object_name from TableName where object_id ='" + object_id + "'")
            .apply("MutationForExistingTableData", 
                    ParDo.of(new DoFn<Struct, Mutation>(){
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Struct str = c.element();
                        c.output(Mutation.delete("TableName", KeySet.newBuilder()
                            .addKey(Key.newBuilder()
                                .append(str.getString("object_id"))
                                .append(str.getString("mapped_object_id"))
                                .append(str.getString("mapped_object_name")).build()).build()));
                      }
                    } ))
            .apply("DeleteExistingTableData", SpannerIO.write().withSpannerConfig(spannerConfig));

              Mutation dataMutation = Mutation.newReplaceBuilder("TableName",
                  .
                  .
                  .

                  );
              List<Mutation> list = new ArrayList<Mutation>();


              List<Map<String, String>> mappingList = input.listOfObjectRows;

              for (Map<String, String> objectMap : mappingList ) {
                list.add(Mutation.newReplaceBuilder("TableName",
                    .
                    .
                    .);
              }     

              return MutationGroup.create(dataMutation, list);


          }
        } )));


        upsertMutationGroup.apply("WriteDataToSpanner", SpannerIO.write()
            .withSpannerConfig(spannerConfig)
            .grouped());

        // Run the pipeline.
        pipeLine.run().waitUntilFinish();
  }

}

class TableData implements Serializable {
  String objectId;
  List<Map<String, String>> listOfObjectRows;

}

期望在插入或更新数据之前必须从表中删除现有的映射数据。

【问题讨论】:

    标签: google-cloud-dataflow google-cloud-pubsub google-cloud-spanner


    【解决方案1】:

    我不完全确定你在做什么,但看起来你想要:

    • 使用与 pubsub 消息匹配的键(或部分键)读取一些现有数据
    • 删除此数据
    • 从 pubsub 消息中插入新数据

    一种选择是创建一个DoFn,它在读写事务中执行此读取/删除/插入(或读取/更新)。这将保持数据库的一致性...

    使用SpannerIO.WriteFn 作为模型 - 您需要将SpannerAccessor 设置为瞬态并在@Setup@Teardown 事件处理程序中创建/删除它

    DoFn@ProcessElement 处理程序将创建一个Read-write Transaction,您将在其中读取键的行,更新或删除它们,然后插入新元素。

    这种方法的缺点是每个 Spanner 事务只会处理一个 Pub/Sub 消息(除非您在前面的步骤中做了一些聪明的事情,例如对它们进行分组),这是一个复杂的读写事务。如果您的消息/秒速率相对较低,这很好,但如果不是,此方法会给您的数据库带来更多负载。

    第二种选择是使用键范围的盲删除。仅当 object_id 是复合键的第一部分(它似乎来自您的代码)时,这才有效。

    您将创建一个包含删除突变的MutationGroup,该突变使用带有键范围的删除突变来盲删除其键以 object_id 开头的任何现有行,然后插入突变以替换已删除的行。

    MutationGroup.create(
        // Delete rows with key starting with object_id.
        Mutation.delete("TableName", KeySet.newBuilder()
            .addRange(
                KeyRange.closedClosed(
                    Key.of(str.getString("object_id")),
                    Key.of(str.getString("object_id"))))
            .build()),
        // Insert replacement rows.
        Mutation.newInsertBuilder("TableName")
            .set("column").to("value"),
            ...
            .build(),
        Mutation.newInsertBuilder("TableName")
            ...);
    

    然后,这将像以前一样传递给 SpannerIO.write().grouped(),以便可以对它们进行批处理以提高效率。

    【讨论】:

    • KeyRange.prefix 是我正在寻找的方法。谢谢!!
    猜你喜欢
    • 1970-01-01
    • 2020-04-05
    • 1970-01-01
    • 1970-01-01
    • 2020-12-29
    • 1970-01-01
    • 2018-12-31
    • 2017-04-21
    • 1970-01-01
    相关资源
    最近更新 更多