【问题标题】:Apache Beam in Dataflow Large Side InputDataflow 大型侧输入中的 Apache Beam
【发布时间】:2018-05-11 02:48:32
【问题描述】:

这与this question最相似。

我正在 Dataflow 2.x 中创建一个管道,该管道从 Pubsub 队列获取流输入。传入的每条消息都需要通过来自 Google BigQuery 的非常大的数据集进行流式传输,并在写入数据库之前附加所有相关值(基于键)。

问题在于 BigQuery 的映射数据集非常大 - 任何将其用作辅助输入的尝试都会失败,Dataflow 运行器会抛出错误“java.lang.IllegalArgumentException: ByteString would be too long”。我尝试了以下策略:

1) 侧面输入

  • 如上所述,映射数据(显然)太大而无法执行此操作。如果我在这里错了或者有解决方法,请告诉我,因为这是最简单的解决方案。

2) 键值对映射

  • 在此策略中,我在管道的第一部分读取 BigQuery 数据和 Pubsub 消息数据,然后通过 ParDo 转换运行每个数据,将 PCollections 中的每个值更改为 KeyValue 对。然后,我运行 Merge.Flatten 转换和 GroupByKey 转换以将相关映射数据附加到每条消息。
  • 这里的问题是流数据需要窗口化才能与其他数据合并,因此我还必须将窗口化应用于大型有界 BigQuery 数据。它还要求两个数据集上的窗口策略相同。但是对于有界数据没有任何窗口策略是有意义的,我所做的几次窗口尝试只是在一个窗口中发送所有 BQ 数据,然后再也不发送它。它需要与每条传入的 pubsub 消息一起加入。

3) 在 ParDo (DoFn) 中直接调用 BQ

  • 这似乎是个好主意 - 让每个工作人员声明地图数据的静态实例。如果不存在,则直接调用 BigQuery 以获取它。不幸的是,这每次都会从 BigQuery 引发内部错误(因为在整个消息中只是说“内部错误”)。向 Google 提交支持请求后,他们告诉我,基本上,“你不能那样做”。

看来这个任务并不真正适合“令人尴尬的可并行化”模型,所以我在这里找错树了吗?

编辑:

即使在数据流中使用高内存机器并尝试将侧面输入放入地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long

这是我正在使用的代码示例(伪):

    Pipeline pipeline = Pipeline.create(options);

    PCollectionView<Map<String, TableRow>> mapData = pipeline
            .apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
            .apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn())) 
            .apply(View.asMap());

    PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
            .fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));

    messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            JSONObject data = new JSONObject(new String(c.element().getPayload()));
            String key = getKeyFromData(data);
            TableRow sideInputData = c.sideInput(mapData).get(key);
            if (sideInputData != null) {
                LOG.info("holyWowItWOrked");
                c.output(new TableRow());
            } else {
                LOG.info("noSideInputDataHere");
            }
        }
    }).withSideInputs(mapData));

管道在从ParDo 中记录任何内容之前引发异常并失败。

堆栈跟踪:

java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
        com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
        com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
        com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

【问题讨论】:

  • 您使用的是哪种 SideInput 视图?你能分享一个你是如何使用它的例子吗?
  • 您是否考虑过使用有状态 ParDo?如果您在全局窗口中处理,这将允许您将来自 BigQuery 的值存储在状态中,并使用它来处理从另一个流到达的每个值。您需要使用您提到的相同 Merge.Flatten 方法,因为 Stateful DoFn 仅适用于单个输入集合。
  • 对于您首先评论@BenChambers 侧面输入它是一个大映射表。每行都有一个键字符串,它可能与传入的 Pubsub 消息中的数据相匹配。映射数据集每周更改,但目前约为 4000 万行(约 10 GB),并且在一周内是完全静态且不变的。我现在正在查看有状态的 pardo 文档,看看是否可行......
  • 对于侧输入,您使用的是View.asSingletonView.asMap等吗?例如 -- View.asSingleton 将采用单个元素的 PCollection 并使其对 ParDo 可见。 View.asMap 将采用 PCollection&lt;KV&lt;K, V&gt;&gt; 并将其作为 Map&lt;K, V&gt; 提供,但只会读取您需要的密钥。
  • 有这方面的消息吗?面临同样的问题

标签: java google-bigquery google-cloud-dataflow apache-beam


【解决方案1】:

查看本文https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-2中名为“模式:流模式大型查找表”的部分(这可能是唯一可行的解​​决方案,因为您的侧输入不适合内存):

说明:

大型(以 GB 为单位)查找表必须准确,并且经常更改或 不适合内存。

示例:

您有来自零售商的销售点信息,需要 将产品项目的名称与数据记录相关联 包含产品ID。有成千上万的项目 存储在一个可以不断变化的外部数据库中。还有,所有 必须使用正确的值处理元素。

解决方案:

使用“Calling external services for data enrichment”模式 但不是调用微服务,而是调用读取优化的 NoSQL 数据库(例如 Cloud Datastore 或 Cloud Bigtable)。

对于每个要查找的值,使用 KV 创建一个键值对 实用程序类。执行 GroupByKey 以创建相同密钥类型的批次 对数据库进行调用。在 DoFn 中,调用 该键的数据库,然后将值应用于所有值 遍历可迭代对象。与客户一起遵循最佳实践 如“为数据调用外部服务”中所述的实例化 丰富”。

其他相关模式在本文中描述:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

  • 模式:缓慢变化的查找缓存
  • 模式:调用外部服务来丰富数据

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-07-13
    • 1970-01-01
    • 1970-01-01
    • 2020-03-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多