【问题标题】:Apache Flink RocksDB state managementApache Flink RocksDB 状态管理
【发布时间】:2021-04-18 01:47:23
【问题描述】:

我在同一个 flink 作业中阅读 2 个 kafka 主题。

  • Stream1: 来自第一个主题的消息保存到rocksdb,然后与stream2联合。
  • Stream2: 来自第二个主题的消息被 stream1 保存的状态丰富了,然后它将与 stream1 合并。

主题 1 和主题 2 是不同的来源,但基本上两个来源的输出是相同的。我必须使用来自 topic1 的数据来丰富来自 topic2 的数据。

这里是流程;

val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)

这是问题;

  1. 流动性好吗?
  2. stream2可以访问stream1保存的相同memberId的状态吗?

【问题讨论】:

  • 您的意思是您手动使用 RocksDB 还是仅使用 RocksDb 支持的 Flink 状态?
  • 由 RocksDb 支持的 Flink 状态,而不是手动支持

标签: apache-flink flink-streaming


【解决方案1】:

似乎您应该能够通过使用KeyedCoProcessFunction 来实现您想要的。这或多或少会这样:

stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())

这样,您可以将状态保持在单个 KeyedCoProcessFunction 中,因此您可以同时访问 stream1stream2

所以,对于processElement1,您可以在map 中为stream1 和在processElement2 中做同样的事情您可以在map 中为stream2 做同样的事情。

【讨论】:

  • 哦,那太酷了。那么,如果我想添加像 stream2 这样的新流呢?我想我可以连接 3 个不同的流?
  • 可悲的是你不能。您必须分两个单独的步骤执行此操作,因此首先将stream1stream2 连接,然后必须将其结果与stream3 连接。如果您有更多要连接的流,那么您可以尝试的一个技巧是简单地使用union,然后使用.keyBy(_.memberId) ,最后调用process。在这种情况下,您必须在单个函数中处理来自所有流的元素,因此您必须手动实现对特定流的元素的处理。
猜你喜欢
  • 1970-01-01
  • 2023-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多