【问题标题】:Apache Flink : Creating a Lagged DatastreamApache Flink:创建滞后数据流
【发布时间】:2017-01-05 01:52:14
【问题描述】:

我刚开始使用 Scala 使用 Apache Flink。有人可以告诉我如何从我拥有的当前数据流中创建滞后流(滞后 k 个事件或 k 个时间单位)吗?

基本上,我想在数据流上实现一个自动回归模型(流的线性回归,其自身的时间滞后版本)。因此,需要一个类似于以下伪代码的方法。

val ds : DataStream = ...

val laggedDS : DataStream = ds.map(lag _)

def lag(ds : DataStream, k : Time) : DataStream = {

}

如果每个事件的间隔为 1 秒并且有 2 秒的延迟,我希望样本输入和输出是这样的。

输入:1、2、3、4、5、6、7...
输出:NA、NA、1、2、3、4、5...

【问题讨论】:

  • 你能扩展你的问题并解释你所说的滞后流是什么意思吗?谢谢
  • @FabianHueske,我认为对于滞后的数据流,他意味着在数据流中获取元素的时间比平时晚。例如,延迟 1 分钟会比元素到达流时晚 1 分钟发出元素。
  • 问题是“滞后 k 个事件”而不是“滞后 x 分钟”。一种解释是在 k 个事件的 FIFO 队列中追加新事件,并在新事件到达时转发队列头元素。如果没有明确定义所需的语义,则无法回答问题。
  • @FabianHueske 如果流的频率不一致,我想按计数而不是按时间滞后。还编辑了问题。希望有帮助。 :)
  • 虽然我也很想知道时间变量。 :)

标签: scala apache-flink flink-streaming


【解决方案1】:

鉴于我的要求正确,我会将其实现为带有 FIFO 队列的 FlatMapFunction。队列缓冲k 事件并在新事件到达时发出头部。如果您需要一个容错流应用程序,队列必须注册为状态。然后,Flink 将负责检查状态(即队列)并在发生故障时恢复它。

FlatMapFunction 可能如下所示:

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{

  var fifo: mutable.Queue[X] = new mutable.Queue[X]()

  override def flatMap(value: X, out: Collector[X]): Unit = {
    // add new element to queue
    fifo.enqueue(value)
    if (fifo.size == k + 1) {
      // remove head element and emit
      out.collect(fifo.dequeue())
    }
  }

  // restore state
  override def restoreState(state: mutable.Queue[X]) = { fifo = state }

  // get state to checkpoint
  override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo

}

返回具有时滞的元素更复杂。这将需要用于发射的计时器线程,因为该函数仅在新元素到达时被调用。

【讨论】:

  • 非常感谢!这很棒。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-09-16
  • 2019-02-18
  • 1970-01-01
  • 2019-06-21
  • 2017-02-19
相关资源
最近更新 更多