【问题标题】:Flink : WordCount example throws ClassNotFoundExceptionFlink:WordCount 示例抛出 ClassNotFoundException
【发布时间】:2016-09-02 08:47:21
【问题描述】:

我目前正在尝试将 Flink 嵌入到 Play Framework 项目中。但是当我尝试执行 Flink 文档中的 WordCount 示例时,我收到了这个错误:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[JobExecutionException: Cannot initialize task 'DataSink (collect())': Deserializing the OutputFormat (org.apache.flink.api.java.Utils$CollectHelper@1f158fd6) failed: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1]]
    at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:265) ~[play_2.11-2.4.2.jar:2.4.2]
    at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:191) ~[play_2.11-2.4.2.jar:2.4.2]
    at play.api.GlobalSettings$class.onError(GlobalSettings.scala:179) [play_2.11-2.4.2.jar:2.4.2]
    at play.api.DefaultGlobal$.onError(GlobalSettings.scala:212) [play_2.11-2.4.2.jar:2.4.2]
    at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:94) [play_2.11-2.4.2.jar:2.4.2]
    at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:158) [play-netty-server_2.11-2.4.2.jar:2.4.2]
    at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:155) [play-netty-server_2.11-2.4.2.jar:2.4.2]
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) [scala-library-2.11.7.jar:na]
    at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.7.jar:na]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (collect())': Deserializing the OutputFormat (org.apache.flink.api.java.Utils$CollectHelper@1f158fd6) failed: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:1012) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:na]
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:na]
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.Utils$CollectHelper@1f158fd6) failed: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
    at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:1008) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:na]
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:na]
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:1008) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:na]
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:na]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:na]
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
Caused by: java.lang.ClassNotFoundException: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_60]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_60]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_60]
    at java.lang.Class.forName0(Native Method) ~[na:1.8.0_60]
    at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_60]
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) ~[flink-core-1.0.0.jar:1.0.0]
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) ~[na:1.8.0_60]
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) ~[na:1.8.0_60]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) ~[na:1.8.0_60]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) ~[na:1.8.0_60]

我直接从 IntelliJ 启动我的项目(我不知道这是否是根本原因)。

下面是执行的代码:

object WordCount {

  def test():Unit = {

    // set up the execution environment
    val test = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = test.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // emit result
    counts.print()

    // execute program
    test.execute("WordCount Example")
  }
}

欢迎大家帮忙,谢谢大家!

编辑:只是添加依赖列表:

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.0.0",
  "org.apache.flink" %% "flink-clients" % "1.0.0",
  "org.apache.flink" %% "flink-streaming-scala" % "1.0.0",
)

【问题讨论】:

  • 你能展示你的 build.sbt 依赖吗?似乎找不到类,所以您可能缺少一些 jar/依赖项...
  • 你好 Salem,我只是编辑我的帖子以添加依赖项。
  • 据我记得,Flink 构建假定某些依赖项(我认为是 hadoop)被标记为“已提供”,即 sbt 不会将它们拉入并希望您在应用程序中提供它们.您可以尝试在 Flink 中插入 pom.xlm 以查找需要提供的内容,或者依赖 fat jar(包含所有依赖项)。

标签: scala intellij-idea playframework apache-flink


【解决方案1】:

确保您已将 jar 部署到所有 Flink 实例(jobmanager + 所有 taskmanager 节点)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-07-15
    • 2013-05-27
    • 2017-05-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-23
    相关资源
    最近更新 更多