【发布时间】:2018-07-25 02:38:49
【问题描述】:
使用 Kafka 0.9.0 和 Spark 2.1.0 - 我正在使用 PySpark 结构化流计算结果并将其输出到 Kafka 主题上。我指的是相同的 Spark 文档 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
现在当我运行命令时
(输出模式完成,因为它正在聚合流数据。)
(mydataframe.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "x.x.x.x:9092")
.option("topic", "topicname")
.option("checkpointLocation","/data/checkpoint/1")
.start())
它给了我如下错误
ERROR StreamExecution: Query [id = 0686130b-8668-48fa-bdb7-b79b63d82680, runId = b4b7494f-d8b8-416e-ae49-ad8498dfe8f2] terminated with error
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:72)
at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:88)
at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)**
不确定它期望什么属性值。需要帮助来解决这个问题。
控制台输出接收器在控制台上产生正确的输出,因此代码似乎可以正常工作。仅当使用 kafka 作为输出接收器时才会导致此问题
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-structured-streaming