【发布时间】:2016-09-02 08:47:21
【问题描述】:
我目前正在尝试将 Flink 嵌入到 Play Framework 项目中。但是当我尝试执行 Flink 文档中的 WordCount 示例时,我收到了这个错误:
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[JobExecutionException: Cannot initialize task 'DataSink (collect())': Deserializing the OutputFormat (org.apache.flink.api.java.Utils$CollectHelper@1f158fd6) failed: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:265) ~[play_2.11-2.4.2.jar:2.4.2]
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:191) ~[play_2.11-2.4.2.jar:2.4.2]
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:179) [play_2.11-2.4.2.jar:2.4.2]
at play.api.DefaultGlobal$.onError(GlobalSettings.scala:212) [play_2.11-2.4.2.jar:2.4.2]
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:94) [play_2.11-2.4.2.jar:2.4.2]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:158) [play-netty-server_2.11-2.4.2.jar:2.4.2]
at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$9$$anonfun$apply$1.applyOrElse(PlayDefaultUpstreamHandler.scala:155) [play-netty-server_2.11-2.4.2.jar:2.4.2]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) [scala-library-2.11.7.jar:na]
at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.7.jar:na]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (collect())': Deserializing the OutputFormat (org.apache.flink.api.java.Utils$CollectHelper@1f158fd6) failed: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:1012) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:na]
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.Utils$CollectHelper@1f158fd6) failed: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:1008) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:na]
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) [scala-library-2.11.7.jar:na]
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:1008) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$7.apply(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:na]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:na]
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:996) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) ~[flink-runtime_2.11-1.0.0.jar:1.0.0]
Caused by: java.lang.ClassNotFoundException: io.fabrick.aperture.doctor.stream.WordCount$$anon$2$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_60]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_60]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_60]
at java.lang.Class.forName0(Native Method) ~[na:1.8.0_60]
at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_60]
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) ~[flink-core-1.0.0.jar:1.0.0]
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) ~[na:1.8.0_60]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) ~[na:1.8.0_60]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) ~[na:1.8.0_60]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) ~[na:1.8.0_60]
我直接从 IntelliJ 启动我的项目(我不知道这是否是根本原因)。
下面是执行的代码:
object WordCount {
def test():Unit = {
// set up the execution environment
val test = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = test.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// emit result
counts.print()
// execute program
test.execute("WordCount Example")
}
}
欢迎大家帮忙,谢谢大家!
编辑:只是添加依赖列表:
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.0.0",
"org.apache.flink" %% "flink-clients" % "1.0.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.0.0",
)
【问题讨论】:
-
你能展示你的 build.sbt 依赖吗?似乎找不到类,所以您可能缺少一些 jar/依赖项...
-
你好 Salem,我只是编辑我的帖子以添加依赖项。
-
据我记得,Flink 构建假定某些依赖项(我认为是 hadoop)被标记为“已提供”,即 sbt 不会将它们拉入并希望您在应用程序中提供它们.您可以尝试在 Flink 中插入 pom.xlm 以查找需要提供的内容,或者依赖 fat jar(包含所有依赖项)。
标签: scala intellij-idea playframework apache-flink