【发布时间】:2019-04-06 14:47:16
【问题描述】:
我认为我有一个相当不标准的用例。我想使用filter 函数将我的源流分成几个流:
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)
我还有一个时间戳提取器(传入的事件将在 XML 中附加一个时间戳):
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...
class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}
我选择了这种方法,而不是简单地做一个流 (val s = dataStream.filter(...).map(...).filter(...).map(...)),因为我想构建一个 network 来拆分/组合任意流(例如 s1+s2->c1, s1 +s3->c2, c2+s4->c3, ...)
现在当通过上面的例子发送事件时,事件 E1 可能会同时出现在 s1 和 s2 中。这意味着,在我看来,相同的事件 E1 作为第一个实例放入 s1 (E1a) 并作为第二个实例放入 s2 (E1b)。
所以我现在要做的就是将 E1a 和 E1b 重新组合成一个类似于 E1 的组合 E1,它同时是 s1 和 s2 的转换。
我试过了:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })
但是,事件似乎永远不会到达 apply 函数,我无法找出原因。
我的例子有什么问题?我对像这样的流网络的方法/想法是否可行?
【问题讨论】:
-
看看
union而不是join。 -
你能详细说明一下吗?如何使用
union函数实现相同的功能? -
没关系;我没有仔细阅读你的问题。
标签: scala stream apache-flink