【问题标题】:Apache flink (1.9.1) runtime exception when using case classes in scala (2.12.8)在 scala (2.12.8) 中使用案例类时 Apache flink (1.9.1) 运行时异常
【发布时间】:2020-05-09 06:43:30
【问题描述】:

我在 Scala (2.12.8) Apache Flink (1.9.1) 应用程序中使用案例类。当我运行Caused by: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V.下面的代码时出现以下异常

注意:我已按照建议 (java.lang.NoSuchMethodException for init method in Scala case class) 使用了默认构造函数,但这不适用于我的情况

这是完整的代码

package com.zignallabs
import org.apache.flink.api.scala._
/**
 // Implements the program that reads from a Element list, Transforms it into tuple and outputs to TaskManager
 */

case  class  AddCount ( firstP: String, count: Int) {
  def this () = this ("default", 1)  // No help when added default constructor as per https://stackoverflow.com/questions/51129809/java-lang-nosuchmethodexception-for-init-method-in-scala-case-class
}

object WordCount {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    // get input data
    val input  =env.fromElements(" one", "two", "three", "four", "five", "end of test")



    // *****   Line 31 throws the exception
    // Caused by: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V
    //  at com.zignallabs.AddCount.<init>(WordCount.scala:7)
    //  at com.zignallabs.WordCount$.$anonfun$main$1(WordCount.scala:31)
    //  at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
    //  at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
    //  at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    //  at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
    //  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    //  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    //  at java.lang.Thread.run(Thread.java:748)
    val transform = input.map{w => AddCount(w, 1)}  // <- Throwing exception

    // execute and print result
    println(transform)
    transform.print()
    transform.printOnTaskManager(" Word")

    env.execute()
  }
}

运行时异常是:

    at com.zignallabs.AddCount.<init>(WordCount.scala:7)
    at com.zignallabs.WordCount$.$anonfun$main$1(WordCount.scala:31)
    at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
    at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:196)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

我正在使用 flink 版本 1.9.1 的本地 flink 集群在本地构建和运行 flink。

这里是 build.sbt 文件:

name := "flink191KafkaScala"

version := "0.1-SNAPSHOT"

organization := "com.zignallabs"

scalaVersion := "2.12.8"

val flinkVersion = "1.9.1"

//javacOptions ++= Seq("-source", "1.7", "-target", "1.7")

val http4sVersion = "0.16.6"

resolvers ++= Seq(
  "Local Ivy" at "file:///"+Path.userHome+"/.ivy2/local",
  "Local Ivy Cache" at "file:///"+Path.userHome+"/.ivy2/cache",
  "Local Maven Repository" at "file:///"+Path.userHome+"/.m2/repository",
  "Artifactory Cache" at "https://zignal.artifactoryonline.com/zignal/zignal-repos"
)

val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",

  "org.apache.flink" %% "flink-clients" % "1.9.1",
  // Upgrade to flink-connector-kafka_2.11
  "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.9.1",
  //"org.scalaj" %% "scalaj-http" % "2.4.2",
  "com.squareup.okhttp3" % "okhttp" % "4.2.2"
)

publishTo := Some("Artifactory Realm" at "https://zignal.artifactoryonline.com/zignal/zignal")

credentials += Credentials("Artifactory Realm", "zignal.artifactoryonline.com", "buildserver", "buildserver")

//mainClass in Compile := Some("com.zignallabs.StoryCounterTopology")
mainClass in Compile := Some("com.zignallabs.WordCount")

scalacOptions ++= Seq(
  "-feature",
  "-unchecked",
  "-deprecation",
  "-language:implicitConversions",
  "-Yresolve-term-conflict:package",
  "-language:postfixOps",
  "-target:jvm-1.8")


lazy val root = project.in(file(".")).configs(IntegrationTest)

【问题讨论】:

  • 我将您的代码复制/粘贴到 IntelliJ 中,它对我来说运行良好。这表明问题出在其他地方:可能是构建环境或项目设置。
  • 能否请您将依赖项粘贴到此处并验证集群上的版本?这个错误在 99% 的情况下是由机器和 flink 或开发者机器和集群之间的 scala 版本不匹配引起的。
  • 大卫·安德森和多米尼克·沃辛斯基。谢谢回复。我已将 build.sbt 文件添加到原始帖子中。清洁环境的最佳方法是什么?

标签: scala apache-flink


【解决方案1】:

我现在可以使用 Scala 2.12 运行这个应用程序。问题在于环境。我需要确保不存在冲突二进制文件,尤其是 scala 2.11 和 scala 2.12

【讨论】:

    【解决方案2】:

    如果你为案例类的构造函数使用默认参数,那么像这样定义它们更符合 Scala 的习惯:

    case class AddCount ( firstP: String = "default", count: Int = 1)

    这是一种语法糖,基本上可以免费为您提供以下内容:

    case  class  AddCount ( firstP: String, count: Int) {
      def this () = this ("default", 1)
    
      def this (firstP:String) = this (firstP, 1)
    
      def this (count:Int) = this ("default", count)
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-13
      相关资源
      最近更新 更多