【发布时间】:2017-03-31 04:36:12
【问题描述】:
我正在使用结构化流 (Spark 2.0.2) 来使用 kafka 消息。使用 scalapb,protobuf 中的消息。我收到以下错误。请帮忙..
线程“主”scala.ScalaReflectionException 中的异常:是 不是一个术语 scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) 在 scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84) 在 org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811) 在 org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39) 在 org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800) 在 org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39) 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582) 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460) 在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592) 在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) 在 scala.collection.immutable.List.flatMap(List.scala:344) 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:583) 在 org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) 在 org.apache.spark.sql.Encoders$.product(Encoders.scala:274) 在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
以下是我的代码...
object PersonConsumer {
import org.apache.spark.rdd.RDD
import com.trueaccord.scalapb.spark._
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
def parseLine(s: String): Person =
Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")
val ds4 = spark.sqlContext.sql("select name from persons")
val query = ds4.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}
【问题讨论】:
标签: scala apache-kafka spark-streaming scalapb