【问题标题】:Backpressure in connected flink streams连接的 flink 流中的背压
【发布时间】:2018-02-15 12:55:15
【问题描述】:

当我将 ConnectedStreams 作为计算图的一部分时,我正在尝试如何正确传播背压。问题是:我有两个来源,一个比另一个更快地摄取数据,认为我们想要重放一些数据,一个来源有我们用来丰富另一个来源的罕见事件。然后将这两个源连接在一个流中,该流期望它们至少在某种程度上同步,以某种方式将它们合并在一起(制作元组,丰富......)并返回结果。

使用单个输入流,它很容易实现背压,您只需要在 processElement 函数上花费很长时间。对于 connectedstreams,我最初的想法是在每个 processFunctions 中都有一些逻辑,等待另一个流赶上。例如,我可以有一个时间跨度有限的缓冲区(足够大的跨度以适合水印),并且该函数不会接受会使该跨度超过阈值的事件。例如:

leftLock.aquire { nonEmptySignal =>
  while (queueSpan() > capacity.toMillis && lastTs() < ctx.timestamp()) {
    println("WAITING")
    nonEmptySignal.await()
  }

  queueOp { queue =>
    println(s"Left Event $value recieved ${Thread.currentThread()}")
    queue.add(Left(value))
  }
  ctx.timerService().registerEventTimeTimer(value.ts)
}

我的示例的完整代码如下(它用两个锁编写,假设从两个不同的线程进行访问,但事实并非如此 - 我认为):

import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.{Condition, ReentrantLock}

import scala.collection.JavaConverters._
import com.google.common.collect.MinMaxPriorityQueue
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.util.Collector

import scala.collection.mutable
import scala.concurrent.duration._

trait Timestamped {
  val ts: Long
}

case class StateObject(ts: Long, state: String) extends Timestamped

case class DataObject(ts: Long, data: String) extends Timestamped

case class StatefulDataObject(ts: Long, state: Option[String], data: String) extends Timestamped

class DataSource[A](factory: Long => A, rate: Int, speedUpFactor: Long = 0) extends RichSourceFunction[A] {

  private val max = new AtomicLong()
  private val isRunning = new AtomicBoolean(false)
  private val speedUp = new AtomicLong(0)
  private val WatermarkDelay = 5 seconds

  override def cancel(): Unit = {
    isRunning.set(false)
  }

  override def run(ctx: SourceFunction.SourceContext[A]): Unit = {
    isRunning.set(true)
    while (isRunning.get()) {
      val time = System.currentTimeMillis() + speedUp.addAndGet(speedUpFactor)
      val event = factory(time)
      ctx.collectWithTimestamp(event, time)
      println(s"Event $event sourced $speedUpFactor")

      val watermark = time - WatermarkDelay.toMillis
      if (max.get() < watermark) {
        ctx.emitWatermark(new Watermark(time - WatermarkDelay.toMillis))
        max.set(watermark)
      }
      Thread.sleep(rate)
    }
  }
}

class ConditionalOperator {
  private val lock = new ReentrantLock()
  private val signal: Condition = lock.newCondition()

  def aquire[B](func: Condition => B): B = {
    lock.lock()
    try {
      func(signal)
    } finally {
      lock.unlock()
    }
  }
}

class BlockingCoProcessFunction(capacity: FiniteDuration = 20 seconds)
  extends CoProcessFunction[StateObject, DataObject, StatefulDataObject] {

  private type MergedType = Either[StateObject, DataObject]
  private lazy val leftLock = new ConditionalOperator()
  private lazy val rightLock = new ConditionalOperator()
  private var queueState: ValueState[MinMaxPriorityQueue[MergedType]] = _
  private var dataState: ValueState[StateObject] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    queueState = getRuntimeContext.getState(new ValueStateDescriptor[MinMaxPriorityQueue[MergedType]](
      "event-queue",
      TypeInformation.of(new TypeHint[MinMaxPriorityQueue[MergedType]]() {})
    ))

    dataState = getRuntimeContext.getState(new ValueStateDescriptor[StateObject](
      "event-state",
      TypeInformation.of(new TypeHint[StateObject]() {})
    ))
  }

  override def processElement1(value: StateObject,
                               ctx: CoProcessFunction[StateObject, DataObject, StatefulDataObject]#Context,
                               out: Collector[StatefulDataObject]): Unit = {
    leftLock.aquire { nonEmptySignal =>
      while (queueSpan() > capacity.toMillis && lastTs() < ctx.timestamp()) {
        println("WAITING")
        nonEmptySignal.await()
      }

      queueOp { queue =>
        println(s"Left Event $value recieved ${Thread.currentThread()}")
        queue.add(Left(value))
      }
      ctx.timerService().registerEventTimeTimer(value.ts)
    }
  }

  override def processElement2(value: DataObject,
                               ctx: CoProcessFunction[StateObject, DataObject, StatefulDataObject]#Context,
                               out: Collector[StatefulDataObject]): Unit = {
    rightLock.aquire { nonEmptySignal =>
      while (queueSpan() > capacity.toMillis && lastTs() < ctx.timestamp()) {
        println("WAITING")
        nonEmptySignal.await()
      }

      queueOp { queue =>
        println(s"Right Event $value recieved ${Thread.currentThread()}")
        queue.add(Right(value))
      }
      ctx.timerService().registerEventTimeTimer(value.ts)
    }
  }

  override def onTimer(timestamp: Long,
                       ctx: CoProcessFunction[StateObject, DataObject, StatefulDataObject]#OnTimerContext,
                       out: Collector[StatefulDataObject]): Unit = {
    println(s"Watermarked $timestamp")
    leftLock.aquire { leftSignal =>
      rightLock.aquire { rightSignal =>
        queueOp { queue =>
          while (Option(queue.peekFirst()).exists(x => timestampOf(x) <= timestamp)) {
            queue.poll() match {
              case Left(state) =>
                dataState.update(state)
                leftSignal.signal()
              case Right(event) =>
                println(s"Event $event emitted ${Thread.currentThread()}")
                out.collect(
                  StatefulDataObject(
                    event.ts,
                    Option(dataState.value()).map(_.state),
                    event.data
                  )
                )
                rightSignal.signal()
            }
          }
        }
      }
    }
  }

  private def queueOp[B](func: MinMaxPriorityQueue[MergedType] => B): B = queueState.synchronized {
    val queue = Option(queueState.value()).
      getOrElse(
        MinMaxPriorityQueue.
          orderedBy(Ordering.by((x: MergedType) => timestampOf(x))).create[MergedType]()
      )
    val result = func(queue)
    queueState.update(queue)
    result
  }

  private def timestampOf(data: MergedType): Long = data match {
    case Left(y) =>
      y.ts
    case Right(y) =>
      y.ts
  }

  private def queueSpan(): Long = {
    queueOp { queue =>
      val firstTs = Option(queue.peekFirst()).map(timestampOf).getOrElse(Long.MaxValue)
      val lastTs = Option(queue.peekLast()).map(timestampOf).getOrElse(Long.MinValue)
      println(s"Span: $firstTs - $lastTs = ${lastTs - firstTs}")
      lastTs - firstTs
    }
  }

  private def lastTs(): Long = {
    queueOp { queue =>
      Option(queue.peekLast()).map(timestampOf).getOrElse(Long.MinValue)
    }
  }
}

object BackpressureTest {

  var data = new mutable.ArrayBuffer[DataObject]()

  def main(args: Array[String]): Unit = {
    val streamConfig = new Configuration()
    val env = new StreamExecutionEnvironment(new LocalStreamEnvironment(streamConfig))

    env.getConfig.disableSysoutLogging()
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stateSource = env.addSource(new DataSource(ts => StateObject(ts, ts.toString), 1000))
    val dataSource = env.addSource(new DataSource(ts => DataObject(ts, ts.toString), 100, 100))

    stateSource.
      connect(dataSource).
      keyBy(_ => "", _ => "").
      process(new BlockingCoProcessFunction()).
      print()

    env.execute()
  }
}

连接流的问题是,当它的流超前时,您似乎不能简单地阻塞其中一个 processFunction,因为这也会阻塞另一个 processFunction。另一方面,如果我只是简单地接受了这项工作中的所有事件,最终进程函数将耗尽内存。因为它会缓冲前面的整个流。

所以我的问题是:是否可以将背压分别传播到 ConnectedStreams 中的每个流中,如果可以,如何传播?或者,有没有其他好的方法来处理这个问题?可能所有来源都以某种方式进行通信以使它们大部分时间处于相同的事件时间?

【问题讨论】:

  • fwiw,两个线程永远不会同时访问一个 Flink 算子
  • 是的,那么这是一个问题,在我的用例中传播背压是不可能的,至少在 CoProcessFunction 接口内是不可能的。

标签: apache-flink


【解决方案1】:

从我对 StreamTwoInputProcessor 中代码的阅读来看,我认为 processInput() 方法负责实施相关策略。也许人们可以实现一种从具有较低水印的流中读取的变体,只要它具有未读输入。不过,不确定这会对整体产生什么影响。

【讨论】:

  • 我认为 STIP 有能力实现我所建议的这种背压机制。但是我认为将自定义实现插入计算并不容易,API似乎不允许此类的自定义实现,还是我错了?必须更改整个流图生成。
  • STIP 对我来说或多或少觉得是做这件事的合适地方,而且我认为它不需要进行重大更改,尽管您可能是对的,当前的 API 不会完全容纳它。弹出一个关卡——虽然在学术上很有趣,但这真的是一个值得解决的问题吗?我不相信这种在背压下的不平衡会导致任何严重的疾病。
  • 我认为在事件时间处理流中这是一个相当严重的问题。在我的示例中,此类作业的内存需求会随着时间以线性速率增加。更快的流只是缓冲到 CoProcessFunction 中,直到系统遇到内存不足异常,或者直到源停止。我认为即使在系统时间流中,当 CoProcessStreams 之一具有一些吞吐量非常低的先决条件步骤时,这个问题也可能存在。
  • 我的结论是你是对的——在某些情况下这可能是个问题。
猜你喜欢
  • 2017-12-28
  • 2016-02-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-01
  • 2019-06-20
  • 2020-09-20
  • 1970-01-01
相关资源
最近更新 更多