【问题标题】:Class 'SessionTrigger' must either be declared abstract or implement abstract member类“SessionTrigger”必须声明为抽象或实现抽象成员
【发布时间】:2017-08-17 16:49:42
【问题描述】:

我正在构建一个关于 Flink 1.2 的教程,我想运行一些简单的窗口示例。其中之一是会话窗口。

我要运行的代码如下:

import <package>.Session
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

import scala.util.Try

object SessionWindowExample {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source = env.socketTextStream("localhost", 9000)

    //session map
    val values = source.map(value => {
      val columns = value.split(",")
      val endSignal = Try(Some(columns(2))).getOrElse(None)
      Session(columns(0), columns(1).toDouble, endSignal)
    })

    val keyValue = values.keyBy(_.sessionId)

    // create global window

    val sessionWindowStream = keyValue.
      window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(new SessionTrigger[GlobalWindow]()))

    sessionWindowStream.sum("value").print()

    env.execute()
  }
}

您会注意到,我需要根据此类实例化一个 new SessionTrigger 对象:

import <package>.Session
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window

class SessionTrigger[W <: Window] extends Trigger[Session,W] {

  override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if(element.endSignal.isDefined) TriggerResult.FIRE
    else TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

但是,InteliJ 一直在抱怨: Class 'SessionTrigger' must either be declared abstract or implement abstract member 'clear(window: W, ctx: TriggerContext):void' in 'org.apache.flink.streaming.api.windowing.triggers.Trigger'.

我尝试在课堂上添加这个:

 override def clear(window: W, ctx: TriggerContext): Unit = ctx.deleteEventTimeTimer(4)

但它不起作用。这是我得到的错误:

  03/27/2017 15:48:38   TriggerWindow(GlobalWindows(), ReducingStateDescriptor{serializer=co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anon$2$$anon$1@1aec64d0, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@1a052a00}, PurgingTrigger(co.uk.DRUK.flink.windowing.SessionWindowExample.SessionTrigger@f2f2cc1), WindowedStream.reduce(WindowedStream.java:276)) -> Sink: Unnamed(4/4) switched to CANCELED
  03/27/2017 15:48:38   Job execution switched to status FAILED.
  Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
  at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
  at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
  at co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anonfun$1.apply(SessionWindowExample.scala:27)
  at co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anonfun$1.apply(SessionWindowExample.scala:24)
  at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:521)
  at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
  at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
  at java.lang.Thread.run(Thread.java:745)

  Process finished with exit code 1

有人知道为什么吗?

【问题讨论】:

  • 我不确定你的问题是什么,这个错误似乎很容易解释。如果你想扩展Trigger,你需要实现上面提到的方法,因为Trigger将该方法声明为抽象。
  • 谢谢...让我用我尝试过的方法更新我的问题。 (scala-still 学习不太好)
  • 你的意思是它不工作?如果你问你应该怎么做来编译你的例子,实现clear 方法就足够了。由于您没有在 ctx 中注册任何内容(没有计时器),因此您的 clear 方法应该为空。
  • 尝试了一个空的clear 方法,我得到了完全相同的错误。让我添加我遇到的错误。

标签: scala abstract-class apache-flink


【解决方案1】:

异常清楚地说明了

 Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
  at co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anonfun$1.apply(SessionWindowExample.scala:27)
  at co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anonfun$1.apply(SessionWindowExample.scala:24)

这显然映射到以下代码行

  Session(columns(0), columns(1).toDouble, endSignal)

所以下一个显而易见的事情是在之后记录您的 columnsvalue

  val columns = value.split(",")

我怀疑value 至少对于某些值不包含第二个逗号分隔的列。

【讨论】:

  • 诚然,这太明显了。代码没有任何问题......这只是我的输入......适合我。干杯。
猜你喜欢
  • 1970-01-01
  • 2015-02-28
  • 1970-01-01
  • 2015-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-18
相关资源
最近更新 更多