【问题标题】:Scala Flink get java.lang.NoClassDefFoundError: scala/Product$class after using case class for customized DeserializationSchemaScala Flink get java.lang.NoClassDefFoundError: scala/Product$class after using case class for customed DeserializationSchema
【发布时间】:2020-11-25 05:23:24
【问题描述】:

使用泛型类时效果很好。

但是在将类更改为案例类后得到 java.lang.NoClassDefFoundError: scala/Product$class 错误。

不确定是sbt打包问题还是代码问题。

当我使用时:

  • sbt

  • 斯卡拉:2.11.12

  • java: 8

  • sbt 组装到打包

package example

import java.util.Properties
import java.nio.charset.StandardCharsets
import org.apache.flink.api.scala._
import org.apache.flink.streaming.util.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.api.common.typeinfo.TypeInformation
import Config._


case class Record(
  id: String,
  startTime: Long
) {}

class RecordDeSerializer extends DeserializationSchema[Record] with SerializationSchema[Record] {

  override def serialize(record: Record): Array[Byte] = {
    return "123".getBytes(StandardCharsets.UTF_8)
  }
  override def deserialize(b: Array[Byte]): Record = {
    Record("1", 123)
  }
  override def isEndOfStream(record: Record): Boolean = false
  override def getProducedType: TypeInformation[Record] = {
    createTypeInformation[Record]
  }
}


object RecordConsumer {
  def main(args: Array[String]): Unit = {
    val config  : Properties = {
      var p = new Properties()
      p.setProperty("zookeeper.connect", Config.KafkaZookeeperServers)
      p.setProperty("bootstrap.servers", Config.KafkaBootstrapServers)
      p.setProperty("group.id", Config.KafkaGroupID)
      p
    }

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(1000)

    var consumer = new FlinkKafkaConsumer[Record](
      Config.KafkaTopic,
      new RecordDeSerializer(),
      config
    )
    consumer.setStartFromEarliest()

    val stream = env.addSource(consumer).print

    env.execute("record consumer")
  }
}

错误

2020-08-05 04:07:33,963 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding checkpoint 1670 of job 4de8831901fa72790d0a9a973cc17dde.                             
java.lang.NoClassDefFoundError: scala/Product$class

...

build.SBT

第一个想法是版本可能不正确。 但是,如果使用普通类,一切都可以正常工作

这里是build.sbt

ThisBuild / resolvers ++= Seq(
    "Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
    Resolver.mavenLocal
)

name := "deedee"

version := "0.1-SNAPSHOT"

organization := "dexterlab"

ThisBuild / scalaVersion := "2.11.8"

val flinkVersion = "1.8.2"

val flinkDependencies = Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
)

val thirdPartyDependencies = Seq(
    "com.github.nscala-time" %% "nscala-time" % "2.24.0",
    "com.typesafe.play" %% "play-json" % "2.6.14",
)


lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= thirdPartyDependencies,
    libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value,

  )

assembly / mainClass := Some("dexterlab.TelecoDataConsumer")

// make run command include the provided dependencies
Compile / run  := Defaults.runTask(Compile / fullClasspath,
                                   Compile / run / mainClass,
                                   Compile / run / runner
                                  ).evaluated


// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true

// exclude Scala library from assembly
assembly / assemblyOption  := (assembly / assemblyOption).value.copy(includeScala = false)

autoCompilerPlugins := true

【问题讨论】:

标签: scala apache-flink


【解决方案1】:

build.sbt添加这一行后终于成功

assembly / assemblyOption := (assemblu / assemblyOption).value.copy(includeScala = true)

在运行 sbt 程序集时包含 scala 库

【讨论】:

    猜你喜欢
    • 2017-07-18
    • 2021-11-04
    • 2019-06-23
    • 2017-11-07
    • 2021-08-18
    • 2017-03-06
    • 2019-05-16
    • 2017-02-25
    • 1970-01-01
    相关资源
    最近更新 更多