【问题标题】:Apache Flink: Count window with timeoutApache Flink:计数窗口超时
【发布时间】:2018-04-11 20:08:07
【问题描述】:

这是一个简单的代码示例来说明我的问题:

case class Record( key: String, value: Int )

object Job extends App
{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
  val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  val merged = step1.union( step2, step3 )
  val keyed = merged.keyBy(0)
  val windowed = keyed.countWindow( 3 )
  val summed = windowed.sum( 1 )
  summed.print()
  env.execute("test")
}

这会产生以下结果:

Record(01,6)
Record(02,12)
Record(04,24)
Record(05,30)

正如预期的那样,没有为键“03”生成任何结果,因为计数窗口需要 3 个元素,而流中只有两个元素。

我想要的是某种带有超时的计数窗口,以便在某个超时后,如果未达到计数窗口预期的元素数量,则使用现有元素产生部分结果。

通过这种行为,在我的示例中,当达到超时时会生成 Record(03,15)。

【问题讨论】:

    标签: scala timeout apache-flink flink-streaming


    【解决方案1】:

    您也可以使用自定义窗口 Trigger 来执行此操作,该窗口在达到计数或超时到期时触发 - 有效地混合了内置的 CountTriggerEventTimeTrigger

    【讨论】:

      【解决方案2】:

      我遵循了 David 和 NIrav 的方法,结果如下。

      1) 使用自定义触发器:

      在这里,我颠倒了我最初的逻辑。我没有使用“计数窗口”,而是使用了一个持续时间与超时相对应的“时间窗口”,然后是一个触发器,当所有元素都被处理后触发。

      case class Record( key: String, value: Int )
      
      object Job extends App
      {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
        val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
        val step2 = data.map( r => Record( r.key, r.value * 2 ) )
        val step3 = data.map( r => Record( r.key, r.value * 3 ) )
        val merged = step1.union( step2, step3 )
        val keyed = merged.keyBy(0)
        val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
        val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
        val summed = triggered.sum( 1 )
        summed.print()
        env.execute("test")
      }
      

      这里是触发代码:

      import org.apache.flink.annotation.PublicEvolving
      import org.apache.flink.api.common.functions.ReduceFunction
      import org.apache.flink.api.common.functions.RuntimeContext
      import org.apache.flink.api.common.state.ReducingState
      import org.apache.flink.api.common.state.ReducingStateDescriptor
      import org.apache.flink.api.common.typeutils.base.LongSerializer
      import org.apache.flink.streaming.api.TimeCharacteristic
      import org.apache.flink.streaming.api.windowing.triggers._
      import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow
      
      /**
       * A trigger that fires when the count of elements in a pane reaches the given count or a 
       * timeout is reached whatever happens first.
       */
      class CountTriggerWithTimeout[W <: TimeWindow](maxCount: Long, timeCharacteristic: TimeCharacteristic) extends Trigger[Object,W] 
      {
        private val countState: ReducingStateDescriptor[java.lang.Long] = new ReducingStateDescriptor[java.lang.Long]( "count", new Sum(), LongSerializer.INSTANCE)
      
        override def onElement(element: Object, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = 
        {
            val count: ReducingState[java.lang.Long] = ctx.getPartitionedState(countState)
            count.add( 1L )
            if ( count.get >= maxCount || timestamp >= window.getEnd ) TriggerResult.FIRE_AND_PURGE else TriggerResult.CONTINUE
        }
      
        override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = 
        {
            if (timeCharacteristic == TimeCharacteristic.EventTime) TriggerResult.CONTINUE else
            {
                if ( time >= window.getEnd ) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
            }
        }
      
        override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = 
        {
            if (timeCharacteristic == TimeCharacteristic.ProcessingTime) TriggerResult.CONTINUE else
            {
                if ( time >= window.getEnd ) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
            }
        }
      
        override def clear(window: W, ctx: TriggerContext): Unit = 
        {
                ctx.getPartitionedState( countState ).clear
          }
      
          class Sum extends ReduceFunction[java.lang.Long] 
        {
              def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
        }
      }
      

      2) 使用过程函数:

      case class Record( key: String, value: Int )
      
      object Job extends App
      {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic( TimeCharacteristic.IngestionTime )
        val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
        val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
        val step2 = data.map( r => Record( r.key, r.value * 2 ) )
        val step3 = data.map( r => Record( r.key, r.value * 3 ) )
        val merged = step1.union( step2, step3 )
        val keyed = merged.keyBy(0)
        val processed = keyed.process( new TimeCountWindowProcessFunction( 3, 100 ) )
        processed.print()
        env.execute("test")
      }
      

      将所有逻辑(即窗口化、触发和求和)都放入函数中:

      import org.apache.flink.streaming.api.functions._
      import org.apache.flink.util._
      import org.apache.flink.api.common.state._
      
      case class Status( count: Int, key: String, value: Long )
      
      class TimeCountWindowProcessFunction( count: Long, windowSize: Long ) extends ProcessFunction[Record,Record] 
      {
          lazy val state: ValueState[Status] = getRuntimeContext
            .getState(new ValueStateDescriptor[Status]("state", classOf[Status]))
      
          override def processElement( input: Record, ctx: ProcessFunction[Record,Record]#Context, out: Collector[Record] ): Unit =
          {
              val updated: Status = Option( state.value ) match {
                  case None => {
                      ctx.timerService().registerEventTimeTimer( ctx.timestamp + windowSize )
                      Status( 1, input.key, input.value )
                  }
                  case Some( current ) => Status( current.count + 1, input.key, input.value + current.value )    
              }
              if ( updated.count == count ) 
              {
                  out.collect( Record( input.key, updated.value ) )
                  state.clear
              }
              else
              {
                  state.update( updated )  
              }        
          }
      
          override def onTimer( timestamp: Long, ctx: ProcessFunction[Record,Record]#OnTimerContext, out: Collector[Record] ): Unit =
          {
              Option( state.value ) match {
                  case None => // ignore
                  case Some( status ) => {
                      out.collect( Record( status.key, status.value ) )
                      state.clear
                  }
              }
          }
      }
      

      【讨论】:

      • 不错。我想触发功能对我会有帮助。
      【解决方案3】:

      我认为您可以使用 ProcessFunction

      来实现这个用例

      您在其中拥有 count 属性和 windowEnd 属性。使用它,您可以决定何时收集数据。

      public class TimeCountWindowProcessFunction extends ProcessFunction {
      
          protected long windowStart;
          protected long windowEnd;
          protected long count;
          private ValueState<CountPojo> state;
      
          public TimeCountWindowProcessFunction(long windowSize, long count) {
      
          this.windowSize = windowSize;
          this.count = count;
      
          }
      
      @Override
          public void open(Configuration parameters) {
      
          TypeInformation<CountPojo> typeInformation = TypeInformation.of(new TypeHint<CountPojo>() {
          });
          ValueStateDescriptor<CountPojo> descriptor = new ValueStateDescriptor("test", typeInformation);
      
          state = getRuntimeContext().getState(descriptor);
      }
      
      
          @Override
          public void processElement(CountPojo input, Context ctx, Collector<CountPojo> out)
                  throws Exception {
      
          long timestamp = ctx.timestamp();
              windowStart = timestamp - (timestamp % windowSize);
              windowEnd = windowStart + windowSize;
      
              // retrieve the current count
              CountPojo current = (CountPojo) state.value();
      
              if (current == null) {
      
                  current = new CountPojo();
              current.count = 1;
      
                  ctx.timerService().registerEventTimeTimer(windowEnd);
              } else {
      
                  current.count += 1;
              }
      
              if(current.count >= count) {
              out.collect(current);
          }
      
              // set the state's timestamp to the record's assigned event time timestamp
              current.setLastModified(ctx.timestamp());
      
              // write the state back
              state.update(current);
          }
      
      
          @Override
          public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountPojo> out)
                  throws Exception {
      
      
              if (windowEnd == timestamp) {
      
                  out.collect(state.value());
              }
      
              state.clear();
          }
      }
      

      希望对你有帮助。

      【讨论】:

      • 我不太清楚你的解决方案中“状态”和“总使用量”的来源。能详细点吗?
      • 我已经更新了答案。因为我没有使用任何IDE。这只是您如何实施的示例
      • 如果你在 flink 中发现了一些棘手的部分,请在 stackoverflow 上发帖。对别人真的很有帮助
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多