【发布时间】:2020-05-09 06:43:30
【问题描述】:
我在 Scala (2.12.8) Apache Flink (1.9.1) 应用程序中使用案例类。当我运行Caused by: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V.下面的代码时出现以下异常
注意:我已按照建议 (java.lang.NoSuchMethodException for init method in Scala case class) 使用了默认构造函数,但这不适用于我的情况
这是完整的代码
package com.zignallabs
import org.apache.flink.api.scala._
/**
// Implements the program that reads from a Element list, Transforms it into tuple and outputs to TaskManager
*/
case class AddCount ( firstP: String, count: Int) {
def this () = this ("default", 1) // No help when added default constructor as per https://stackoverflow.com/questions/51129809/java-lang-nosuchmethodexception-for-init-method-in-scala-case-class
}
object WordCount {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val input =env.fromElements(" one", "two", "three", "four", "five", "end of test")
// ***** Line 31 throws the exception
// Caused by: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
// at com.zignallabs.AddCount.<init>(WordCount.scala:7)
// at com.zignallabs.WordCount$.$anonfun$main$1(WordCount.scala:31)
// at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
// at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
// at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
// at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
// at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
// at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
// at java.lang.Thread.run(Thread.java:748)
val transform = input.map{w => AddCount(w, 1)} // <- Throwing exception
// execute and print result
println(transform)
transform.print()
transform.printOnTaskManager(" Word")
env.execute()
}
}
运行时异常是:
at com.zignallabs.AddCount.<init>(WordCount.scala:7)
at com.zignallabs.WordCount$.$anonfun$main$1(WordCount.scala:31)
at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
我正在使用 flink 版本 1.9.1 的本地 flink 集群在本地构建和运行 flink。
这里是 build.sbt 文件:
name := "flink191KafkaScala"
version := "0.1-SNAPSHOT"
organization := "com.zignallabs"
scalaVersion := "2.12.8"
val flinkVersion = "1.9.1"
//javacOptions ++= Seq("-source", "1.7", "-target", "1.7")
val http4sVersion = "0.16.6"
resolvers ++= Seq(
"Local Ivy" at "file:///"+Path.userHome+"/.ivy2/local",
"Local Ivy Cache" at "file:///"+Path.userHome+"/.ivy2/cache",
"Local Maven Repository" at "file:///"+Path.userHome+"/.m2/repository",
"Artifactory Cache" at "https://zignal.artifactoryonline.com/zignal/zignal-repos"
)
val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-clients" % "1.9.1",
// Upgrade to flink-connector-kafka_2.11
"org.apache.flink" %% "flink-connector-kafka-0.11" % "1.9.1",
//"org.scalaj" %% "scalaj-http" % "2.4.2",
"com.squareup.okhttp3" % "okhttp" % "4.2.2"
)
publishTo := Some("Artifactory Realm" at "https://zignal.artifactoryonline.com/zignal/zignal")
credentials += Credentials("Artifactory Realm", "zignal.artifactoryonline.com", "buildserver", "buildserver")
//mainClass in Compile := Some("com.zignallabs.StoryCounterTopology")
mainClass in Compile := Some("com.zignallabs.WordCount")
scalacOptions ++= Seq(
"-feature",
"-unchecked",
"-deprecation",
"-language:implicitConversions",
"-Yresolve-term-conflict:package",
"-language:postfixOps",
"-target:jvm-1.8")
lazy val root = project.in(file(".")).configs(IntegrationTest)
【问题讨论】:
-
我将您的代码复制/粘贴到 IntelliJ 中,它对我来说运行良好。这表明问题出在其他地方:可能是构建环境或项目设置。
-
能否请您将依赖项粘贴到此处并验证集群上的版本?这个错误在 99% 的情况下是由机器和 flink 或开发者机器和集群之间的 scala 版本不匹配引起的。
-
大卫·安德森和多米尼克·沃辛斯基。谢谢回复。我已将 build.sbt 文件添加到原始帖子中。清洁环境的最佳方法是什么?
标签: scala apache-flink