【问题标题】:Splitting & joining streams in apache flink在 apache flink 中拆分和加入流
【发布时间】:2019-04-06 14:47:16
【问题描述】:

我认为我有一个相当不标准的用例。我想使用filter 函数将我的源流分成几个流:

val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)

我还有一个时间戳提取器(传入的事件将在 XML 中附加一个时间戳):

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...

dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...

class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
  override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
  override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}

我选择了这种方法,而不是简单地做一个流 (val s = dataStream.filter(...).map(...).filter(...).map(...)),因为我想构建一个 network 来拆分/组合任意流(例如 s1+s2->c1, s1 +s3->c2, c2+s4->c3, ...)

现在当通过上面的例子发送事件时,事件 E1 可能会同时出现在 s1 和 s2 中。这意味着,在我看来,相同的事件 E1 作为第一个实例放入 s1 (E1a) 并作为第二个实例放入 s2 (E1b)。

所以我现在要做的就是将 E1a 和 E1b 重新组合成一个类似于 E1 的组合 E1,它同时是 s1 和 s2 的转换。

我试过了:

val c1 = s1.join(s2)
  .where(_.key).equalTo(_.key)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })

但是,事件似乎永远不会到达 apply 函数,我无法找出原因。

我的例子有什么问题?我对像这样的流网络的方法/想法是否可行?

【问题讨论】:

  • 看看union而不是join
  • 你能详细说明一下吗?如何使用union 函数实现相同的功能?
  • 没关系;我没有仔细阅读你的问题。

标签: scala stream apache-flink


【解决方案1】:

你有安排水印吗?当使用事件时间时,只有当水印到达时才会触发一个窗口,该水印将事件时间时钟提前超过窗口的末尾。您可以使用时间戳提取器/水印生成器来执行此操作;有关详细信息,请参阅an example from the documentation

如果其中一个流有时处于空闲状态,这也会导致问题,因为空闲流上缺少水印会阻碍它连接到的任何流的水印。

根据您要执行的操作,您可能会发现使用 CoProcessFunction 比使用时间窗口连接更容易。请查看 Flink 培训站点中的 stateful enrichmentexpiring state 上的练习以获取示例。

【讨论】:

  • 我编辑了我的帖子来描述时间戳功能。在我的测试中,我只使用了 1 个单个事件,即进入 dataStream,然后进入 s1s2。所以s1s2 中只有两个事件实例具有相同的时间戳。我将尝试有状态的丰富变体,看看效果如何。然而,一个问题。在尝试构建 流网络 时,我可能会得到多个处理相同事件的 joins(通过 connect/CoProcessFunction 实现)。这可能不适用于存储的状态,因为多个连接的键/事件是相同的?
  • 如果你只有一个事件,那么这就解释了为什么窗口永远不会触发。对于事件时间窗口,窗口后面必须有一个带有时间戳的水印,才能触发窗口。要生成这样的水印将需要第二个事件。此外,您的时间戳提取器假定事件按时间戳排序。如果这是真的,你应该只使用 AscendingTimestampExtractor。
  • 多个连接(每个键)可以工作,但每个连接都必须管理自己的状态——无法在连接之间共享状态。
  • 好的,没关系。实际上我测试了这种技术,它非常有效。当 E1 进入管道时,它被分配了一个 UUID,我稍后用它来键入所有流,并在 RichCoFlatMapFunction 中再次匹配事件实例。由于我正计划构建一个潜在的大型流网络,这些流网络使用filter- 和connect- 函数进行分叉/连接,您认为当流的数量变大时这种方法会成为问题吗?我正在寻找最佳/正确的方法来最好地利用 flink 在集群环境中的扩展能力。
  • 您应该考虑使用侧输出进行分叉—​​—这可能会产生比过滤器更清洁、性能更高的解决方案(请参阅ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/…)。但除此之外,这应该没问题——这种总体方法相当普遍。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多