【发布时间】:2019-02-21 07:29:45
【问题描述】:
对于大数据中的许多情况,最好一次使用少量记录缓冲区,而不是一次处理一条记录。
最自然的例子是调用一些支持批处理以提高效率的外部 API。
我们如何在 Kafka Streams 中做到这一点?我在 API 中找不到任何我想要的东西。
到目前为止,我有:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
我想要的是:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
在 Scala 和 Akka Streams 中,该函数称为 grouped 或 batch。在 Spark Structured Streaming 中,我们可以使用 mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))。
【问题讨论】:
-
为什么不直接安排处理,然后从流中读取到 chunkSize 记录?
-
您问题的一个旁注是,从流处理器调用外部 API 并不总是最好的模式。有时您会发现,最好将外部数据作为自己的主题引入 Kafka 本身(例如来自数据库、大型机等的 CDC),然后轻松加入流处理本身。
-
Spark 中的 mapPartitions 不保证分区大小。只有流式传输的持续时间会影响窗口大小
-
正如@RobinMoffatt 提到的,将外部数据加载到 Kafka 主题中,将其作为 KTable 读取到您的应用程序中并执行流表连接而不是外部 API 调用可能会更好。
-
除此之外,您可以使用
transform()和附加的state并手动建立批次。如果状态大小小于 200,则将记录放入存储中。如果达到 200 条记录,提取所有数据,进行外部 API 调用---注意,您需要同步进行---,然后清除存储。
标签: java scala apache-spark apache-kafka apache-kafka-streams