【问题标题】:PySpark structured streaming output sink as Kafka giving errorPySpark 结构化流输出接收器作为 Kafka 给出错误
【发布时间】:2018-07-25 02:38:49
【问题描述】:

使用 Kafka 0.9.0 和 Spark 2.1.0 - 我正在使用 PySpark 结构化流计算结果并将其输出到 Kafka 主题上。我指的是相同的 Spark 文档 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

现在当我运行命令时

(输出模式完成,因为它正在聚合流数据。)

(mydataframe.writeStream
    .outputMode("complete")
    .format("kafka")
    .option("kafka.bootstrap.servers", "x.x.x.x:9092")
    .option("topic", "topicname")
    .option("checkpointLocation","/data/checkpoint/1")
    .start())

它给了我如下错误

 ERROR StreamExecution: Query [id = 0686130b-8668-48fa-bdb7-b79b63d82680, runId = b4b7494f-d8b8-416e-ae49-ad8498dfe8f2] terminated with error
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:72)
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:88)
    at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)**

不确定它期望什么属性值。需要帮助来解决这个问题。

控制台输出接收器在控制台上产生正确的输出,因此代码似乎可以正常工作。仅当使用 kafka 作为输出接收器时才会导致此问题

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-structured-streaming


    【解决方案1】:

    不确定它期望什么属性值。需要帮助来解决这个问题。

    您的 myDataFrame 需要一个列 valueStringTypeBinaryType),其中包含您要发送到 Kafka 的有效负载(消息)。

    目前您正在尝试写入 Kafka,但没有描述要写入的数据。

    获取此类列的一种方法是使用.withColumnRenamed 重命名现有列。如果要编写多个列,通常最好创建一个包含数据帧的 JSON 表示的列,可以使用to_json sql.function 获得。 But beware of .toJSON!

    【讨论】:

      【解决方案2】:

      Spark 2.1.0 不支持将 Kafka 作为输出接收器。根据documentation,它已在 2.2.0 中引入。

      另请参阅this answer,它链接到介绍该功能的提交,并提供了替代解决方案,以及此JIRA,它在 2.2.1 中添加了文档。

      【讨论】:

      • 不过,这只是您原始问题的后续错误,您提到的错误消息只能通过在数据框中使用正确的命名法来解决。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-08-18
      • 1970-01-01
      • 1970-01-01
      • 2022-01-13
      • 2021-03-04
      • 2023-03-25
      • 1970-01-01
      相关资源
      最近更新 更多