【发布时间】:2020-11-25 05:23:24
【问题描述】:
使用泛型类时效果很好。
但是在将类更改为案例类后得到 java.lang.NoClassDefFoundError: scala/Product$class 错误。
不确定是sbt打包问题还是代码问题。
当我使用时:
-
sbt
-
斯卡拉:2.11.12
-
java: 8
-
sbt 组装到打包
package example
import java.util.Properties
import java.nio.charset.StandardCharsets
import org.apache.flink.api.scala._
import org.apache.flink.streaming.util.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.api.common.typeinfo.TypeInformation
import Config._
case class Record(
id: String,
startTime: Long
) {}
class RecordDeSerializer extends DeserializationSchema[Record] with SerializationSchema[Record] {
override def serialize(record: Record): Array[Byte] = {
return "123".getBytes(StandardCharsets.UTF_8)
}
override def deserialize(b: Array[Byte]): Record = {
Record("1", 123)
}
override def isEndOfStream(record: Record): Boolean = false
override def getProducedType: TypeInformation[Record] = {
createTypeInformation[Record]
}
}
object RecordConsumer {
def main(args: Array[String]): Unit = {
val config : Properties = {
var p = new Properties()
p.setProperty("zookeeper.connect", Config.KafkaZookeeperServers)
p.setProperty("bootstrap.servers", Config.KafkaBootstrapServers)
p.setProperty("group.id", Config.KafkaGroupID)
p
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
var consumer = new FlinkKafkaConsumer[Record](
Config.KafkaTopic,
new RecordDeSerializer(),
config
)
consumer.setStartFromEarliest()
val stream = env.addSource(consumer).print
env.execute("record consumer")
}
}
错误
2020-08-05 04:07:33,963 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1670 of job 4de8831901fa72790d0a9a973cc17dde.
java.lang.NoClassDefFoundError: scala/Product$class
...
build.SBT
第一个想法是版本可能不正确。 但是,如果使用普通类,一切都可以正常工作
这里是build.sbt
ThisBuild / resolvers ++= Seq(
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal
)
name := "deedee"
version := "0.1-SNAPSHOT"
organization := "dexterlab"
ThisBuild / scalaVersion := "2.11.8"
val flinkVersion = "1.8.2"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
)
val thirdPartyDependencies = Seq(
"com.github.nscala-time" %% "nscala-time" % "2.24.0",
"com.typesafe.play" %% "play-json" % "2.6.14",
)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies,
libraryDependencies ++= thirdPartyDependencies,
libraryDependencies += "org.scala-lang" % "scala-compiler" % scalaVersion.value,
)
assembly / mainClass := Some("dexterlab.TelecoDataConsumer")
// make run command include the provided dependencies
Compile / run := Defaults.runTask(Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated
// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true
// exclude Scala library from assembly
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
autoCompilerPlugins := true
【问题讨论】:
-
可能你有版本冲突。尝试调查这个问题:stackoverflow.com/questions/42498035/…
-
@RayanRal 这是我的第一个想法,但是如何解释使用泛型类是否可以正常工作?
标签: scala apache-flink