【发布时间】:2018-05-11 02:48:32
【问题描述】:
这与this question最相似。
我正在 Dataflow 2.x 中创建一个管道,该管道从 Pubsub 队列获取流输入。传入的每条消息都需要通过来自 Google BigQuery 的非常大的数据集进行流式传输,并在写入数据库之前附加所有相关值(基于键)。
问题在于 BigQuery 的映射数据集非常大 - 任何将其用作辅助输入的尝试都会失败,Dataflow 运行器会抛出错误“java.lang.IllegalArgumentException: ByteString would be too long”。我尝试了以下策略:
1) 侧面输入
- 如上所述,映射数据(显然)太大而无法执行此操作。如果我在这里错了或者有解决方法,请告诉我,因为这是最简单的解决方案。
2) 键值对映射
- 在此策略中,我在管道的第一部分读取 BigQuery 数据和 Pubsub 消息数据,然后通过 ParDo 转换运行每个数据,将 PCollections 中的每个值更改为 KeyValue 对。然后,我运行 Merge.Flatten 转换和 GroupByKey 转换以将相关映射数据附加到每条消息。
- 这里的问题是流数据需要窗口化才能与其他数据合并,因此我还必须将窗口化应用于大型有界 BigQuery 数据。它还要求两个数据集上的窗口策略相同。但是对于有界数据没有任何窗口策略是有意义的,我所做的几次窗口尝试只是在一个窗口中发送所有 BQ 数据,然后再也不发送它。它需要与每条传入的 pubsub 消息一起加入。
3) 在 ParDo (DoFn) 中直接调用 BQ
- 这似乎是个好主意 - 让每个工作人员声明地图数据的静态实例。如果不存在,则直接调用 BigQuery 以获取它。不幸的是,这每次都会从 BigQuery 引发内部错误(因为在整个消息中只是说“内部错误”)。向 Google 提交支持请求后,他们告诉我,基本上,“你不能那样做”。
看来这个任务并不真正适合“令人尴尬的可并行化”模型,所以我在这里找错树了吗?
编辑:
即使在数据流中使用高内存机器并尝试将侧面输入放入地图视图中,我也会收到错误 java.lang.IllegalArgumentException: ByteString would be too long
这是我正在使用的代码示例(伪):
Pipeline pipeline = Pipeline.create(options);
PCollectionView<Map<String, TableRow>> mapData = pipeline
.apply("ReadMapData", BigQueryIO.read().fromQuery("SELECT whatever FROM ...").usingStandardSql())
.apply("BQToKeyValPairs", ParDo.of(new BQToKeyValueDoFn()))
.apply(View.asMap());
PCollection<PubsubMessage> messages = pipeline.apply(PubsubIO.readMessages()
.fromSubscription(String.format("projects/%1$s/subscriptions/%2$s", projectId, pubsubSubscription)));
messages.apply(ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject data = new JSONObject(new String(c.element().getPayload()));
String key = getKeyFromData(data);
TableRow sideInputData = c.sideInput(mapData).get(key);
if (sideInputData != null) {
LOG.info("holyWowItWOrked");
c.output(new TableRow());
} else {
LOG.info("noSideInputDataHere");
}
}
}).withSideInputs(mapData));
管道在从ParDo 中记录任何内容之前引发异常并失败。
堆栈跟踪:
java.lang.IllegalArgumentException: ByteString would be too long: 644959474+1551393497
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.concat(ByteString.java:524)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:576)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.balancedConcat(ByteString.java:575)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString.copyFrom(ByteString.java:559)
com.google.cloud.dataflow.worker.repackaged.com.google.protobuf.ByteString$Output.toByteString(ByteString.java:1006)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:951)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1000)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:771)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
【问题讨论】:
-
您使用的是哪种 SideInput 视图?你能分享一个你是如何使用它的例子吗?
-
您是否考虑过使用有状态 ParDo?如果您在全局窗口中处理,这将允许您将来自 BigQuery 的值存储在状态中,并使用它来处理从另一个流到达的每个值。您需要使用您提到的相同 Merge.Flatten 方法,因为 Stateful DoFn 仅适用于单个输入集合。
-
对于您首先评论@BenChambers 侧面输入它是一个大映射表。每行都有一个键字符串,它可能与传入的 Pubsub 消息中的数据相匹配。映射数据集每周更改,但目前约为 4000 万行(约 10 GB),并且在一周内是完全静态且不变的。我现在正在查看有状态的 pardo 文档,看看是否可行......
-
对于侧输入,您使用的是
View.asSingleton、View.asMap等吗?例如 --View.asSingleton将采用单个元素的 PCollection 并使其对 ParDo 可见。View.asMap将采用PCollection<KV<K, V>>并将其作为Map<K, V>提供,但只会读取您需要的密钥。 -
有这方面的消息吗?面临同样的问题
标签: java google-bigquery google-cloud-dataflow apache-beam