【问题标题】:How to use Futures with Kafka Streams如何将 Futures 与 Kafka Streams 一起使用
【发布时间】:2017-07-03 23:34:33
【问题描述】:

有一个 kafka 集群,我从中消费两个主题并加入它。随着加入的结果,我对数据库进行了一些操作。对 DB 的所有操作都是异步的,因此它们返回给我一个 Future(scala.concurrent.Future,但无论如何它与 java.util.concurrent.CompletableFuture 相同)。所以结果我得到了这样的代码:

val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]

def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]

firstSource.leftJoin(secondSource, joinFunc)
           .mapValues(enrich)
           .foreach(saveResultToStorage)

我可以使用流中的未来值进行操作,还是有更好的方法来处理异步任务(例如 Akka 流中的 .mapAsync)?

【问题讨论】:

标签: scala stream apache-kafka-streams


【解决方案1】:

我也有同样的问题。据我所知,Kafka Streams 的设计并非像 Akka Streams 那样处理多速率流。 Kafka Streams 没有 Akka 的多速率原语,如 mapAsync、throttle、conflate、buffer、batch 等。Kafka Streams 擅长处理主题之间的连接和有状态的数据聚合。 Akka Streams 擅长多速率和异步处理。

你有几个选择来处理这个问题:

  • 在 Kafka Streams 应用程序中进行阻塞调用。这是最简单的,如果您的 Future 调用的吞吐量不比它们的延迟大很多,那就没问题了。 Kafka Streams 对每个分区使用单独的线程,因此您可以使用正在处理的 Kafka 主题的分区来驱动并行性。
  • 使用 Reactive Kafka 库处理 Akka Streams 中的扩充,将扩充的结果发布到另一个 Kafka 主题,然后将其带入您的 Kafka Streams 应用程序。对于异步调用的并行吞吐量比端到端延迟(例如 Web 服务调用或对 NoSQL 数据库的查询)快得多的情况,我们就是这样做的。
  • 将所有扩充数据发布到其自己的 KTable 并将其加入 Kafka Streams 应用程序。事实上,通过 KTables 将流数据与丰富数据连接起来是 Kafka Streams 擅长的。如果丰富数据可以表示为表格,我们将使用它。如果必须动态计算富集数据,则它不起作用。

【讨论】:

    猜你喜欢
    • 2020-05-26
    • 2019-10-24
    • 2019-12-25
    • 2020-06-03
    • 1970-01-01
    • 1970-01-01
    • 2023-04-02
    • 2016-07-23
    • 2014-07-13
    相关资源
    最近更新 更多