【发布时间】:2018-04-29 08:00:52
【问题描述】:
我有几个用例需要根据时间戳同步多个流。
这是一个示例,我想同步交易栏和报价栏,例如我从原始交易和报价中生成的,我汇总:
val tradeBars: DataStream[TradeBar] = trades
.assignAscendingTimestamps(_.epochMillis)
.keyBy("key")
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new TimeTradeBar(new DownTick()))
val quotesWithFlow = quotes
.assignAscendingTimestamps(_.epochMillis)
.keyBy("key")
.countWindow(2, 1)
.reduce((previousQuote, quote) => Quote.localOrderFlow(previousQuote, quote))
.assignAscendingTimestamps(_.epochMillis)
.keyBy("key")
val quoteBars: DataStream[QuoteBar] = quotesWithFlow
.assignAscendingTimestamps(_.epochMillis)
.keyBy("key")
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new QuoteBars.TimeQuoteBar())
val joined: JoinedStreams[TradeBar, QuoteBar]#Where[LocalDateTime]#EqualTo = tradeBars
.join(quoteBars)
.where(_.start).equalTo(_.start)
// need a window here, just want to sync on same time window
我尝试使用 Flink 的窗口连接功能,但显然这需要一个窗口功能,然后我可以做一个应用方法。我想要的只是在同一时间窗口上同步流。我怀疑这不是 join 方法的意图。
我有一个使用 Flink 流连接方法的工作实现。我将它应用于交易条流和原始报价流,但这需要我自己编写一个非常混乱的 CoProcessFunction
CoProcessTradeBarsAndQuotes() extends CoProcessFunction[TradeBar, Quote, (TradeBar, QuoteBar)]
{}
这很麻烦,因为我必须跟踪缓冲区中的引号并仔细执行 process1 和 process2 函数的聚合。我想一定有一个更简单的方法,我只是没有看到它。感谢任何帮助和想法。
【问题讨论】:
-
如果窗口连接不是您想要的,您能否在此处澄清您对“同步流”一词的使用?
-
考虑这个用例:您有多个流,您在完全相同的窗口上进行基于时间的窗口聚合,例如 1 小时窗口,这会产生每天每小时的一组数据。现在您有多个这样的流,并且您希望将它们连接起来,以便您可以在每个窗口中输出数据的联合并将其发送到下游。具体示例:假设 2 只股票。您为每个小时窗口创建开盘价、最高价、最低价、收盘价,为每个窗口提供 4 个值。现在你想加入它,以便将 8 个值作为一个向量。
-
如果我理解正确,您真正想要的是根据时间戳将事件分配给适当的窗口?你不做任何加入,但你宁愿
union这两个流。水印的整个概念将确保两个(多个)流都将根据其时间戳对齐。我的理解正确吗? -
谢谢你的想法,大卫。但是,这不起作用,因为 union 只能处理相同的数据类型,通常情况并非如此。
标签: apache-flink