【问题标题】:Synchronize Apache Flink streams based on a time stamp根据时间戳同步 Apache Flink 流
【发布时间】:2018-04-29 08:00:52
【问题描述】:

我有几个用例需要根据时间戳同步多个流。

这是一个示例,我想同步交易栏和报价栏,例如我从原始交易和报价中生成的,我汇总:

val tradeBars: DataStream[TradeBar] = trades
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new TimeTradeBar(new DownTick()))

val quotesWithFlow = quotes
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .countWindow(2, 1)
  .reduce((previousQuote, quote) => Quote.localOrderFlow(previousQuote, quote))
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")

val quoteBars: DataStream[QuoteBar] = quotesWithFlow
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new QuoteBars.TimeQuoteBar())

val joined: JoinedStreams[TradeBar, QuoteBar]#Where[LocalDateTime]#EqualTo = tradeBars
  .join(quoteBars)
  .where(_.start).equalTo(_.start)
  // need a window here, just want to sync on same time window

我尝试使用 Flink 的窗口连接功能,但显然这需要一个窗口功能,然后我可以做一个应用方法。我想要的只是在同一时间窗口上同步流。我怀疑这不是 join 方法的意图。

我有一个使用 Flink 流连接方法的工作实现。我将它应用于交易条流和原始报价流,但这需要我自己编写一个非常混乱的 CoProcessFunction

CoProcessTradeBarsAndQuotes() extends CoProcessFunction[TradeBar, Quote, (TradeBar, QuoteBar)]
{}

这很麻烦,因为我必须跟踪缓冲区中的引号并仔细执行 process1 和 process2 函数的聚合。我想一定有一个更简单的方法,我只是没有看到它。感谢任何帮助和想法。

【问题讨论】:

  • 如果窗口连接不是您想要的,您能否在此处澄清您对“同步流”一词的使用?
  • 考虑这个用例:您有多个流,您在完全相同的窗口上进行基于时间的窗口聚合,例如 1 小时窗口,这会产生每天每小时的一组数据。现在您有多个这样的流,并且您希望将它们连接起来,以便您可以在每个窗口中输出数据的联合并将其发送到下游。具体示例:假设 2 只股票。您为每个小时窗口创建开盘价、最高价、最低价、收盘价,为每个窗口提供 4 个值。现在你想加入它,以便将 8 个值作为一个向量。
  • 如果我理解正确,您真正想要的是根据时间戳将事件分配给适当的窗口?你不做任何加入,但你宁愿union这两个流。水印的整个概念将确保两个(多个)流都将根据其时间戳对齐。我的理解正确吗?
  • 谢谢你的想法,大卫。但是,这不起作用,因为 union 只能处理相同的数据类型,通常情况并非如此。

标签: apache-flink


【解决方案1】:

你没有提到你用来决定加入哪两只股票(可能很多)的逻辑,但总的来说,我会通过从第一个窗口函数生成输出记录来解决这个问题(开盘,高, low, close, stock) 以及一个表示窗口时间(截断为小时)的附加字段,然后按该时间字段键入并执行另一个窗口操作以创建您需要的股票的连接。

【讨论】:

  • 感谢您的回复。股票是任意选择,就像您喜欢建立的投资组合一样。通常是 3 到 10,有时是 50 的小数字。我也考虑过使用键控流。但这不太可能是低效的吗? keyBy 还具有一些含义,即如何跨集群完成计算。但是,由于我首先相对于股票代码键入,它可能无论如何都会被分发,因此现在按时间窗口键入可能必须将数据带到单个节点,这无论如何都必须发生。现在如何随着时间的推移获得所有的窗户?这对我来说并不明显。
猜你喜欢
  • 2019-02-20
  • 2019-06-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-04
  • 1970-01-01
相关资源
最近更新 更多