【发布时间】:2021-09-04 18:17:10
【问题描述】:
我尝试从 Socket TCP 获取数据以附加到我收到的数据帧 数据并将它们执行到 Seq() 但是当我使用 forEach 附加 他们到数据框有问题这是我的代码:
object CustomReceiver {
def main(args: Array[String]): Unit = {
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("CustomReceiver")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
import spark.implicits._
/*formatdata line data from Socket: number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */
val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
linesData1.flatMap(_.split(" ").map(_.trim))
linesData1.foreachRDD { rdd =>
rdd.foreach{ line => {
val arrraLine = line.split(",").toList
// oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
// oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))
val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
// has an Problem
testDF.show()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
这是我运行时的问题
java.lang.NullPointerException 在 org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231) 在 Cl.CustomReceiver$.$anonfun$main$4(CustomeReceiver.scala:52) 在 Cl.CustomReceiver$.$anonfun$main$4$adapted(CustomeReceiver.scala:45) 在 scala.collection.Iterator.foreach(Iterator.scala:943) 在 scala.collection.Iterator.foreach$(Iterator.scala:943) 在 org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:25) 在 org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012) 在 org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012) 在 org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:131) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)
【问题讨论】:
标签: scala apache-spark spark-streaming