【问题标题】:How to display a streaming DataFrame (as show fails with AnalysisException)?如何显示流数据帧(显示失败并出现 AnalysisException)?
【发布时间】:2017-12-18 22:24:37
【问题描述】:

所以我有一些数据流在 Kafka 主题中,我正在获取这些流数据并将其放入 DataFrame。我想在 DataFrame 中显示数据:

import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'

topic_name = "my-topic"
kafka_broker = "localhost:9092"

producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)

while datetime.now() < terminate:
    producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
    time.sleep(1)

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

readDF.writeStream.format("console").start()
readDF.show()

producer.close()

但我不断收到此错误:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
      File "test2.py", line 30, in <module>
        readDF.show()
      File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
        print(self._jdf.showString(n, 20))
      File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
        raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

我不明白为什么会发生异常,我在show() 之前调用了writeStream.start()。我试图摆脱selectExpr(),但这并没有什么不同。有谁知道如何显示流来源的 DataFrame?我正在使用 Python 3.6.1、Kafka 0.10.2.1 和 Spark 2.2.0

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-structured-streaming


    【解决方案1】:

    Streaming DataFrame 不支持show() 方法。当您调用start() 方法时,它将启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是 ConsoleSink,它会将数据输出到控制台。您无需致电show()

    去掉readDF.show(),然后加上一个sleep,就可以在控制台看到数据了,比如

    query = readDF.writeStream.format("console").start()
    import time
    time.sleep(10) # sleep 10 seconds
    query.stop()
    

    您还需要将startingOffsets 设置为earliest,否则,Kafka 源将只是从最新的偏移量开始,并且在您的情况下不会获取任何内容。

    readDF = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_broker) \
        .option("startingOffsets", "earliest") \
        .option("subscribe", topic_name) \
        .load()
    

    【讨论】:

    • 我第一次运行它时,我得到了java.lang.InterruptedException,表格显示后显示WARN Shell: Interrupted while joining on: Thread[Thread-80,5,main],尽管我再次运行它并没有出现。知道是什么原因造成的吗?
    • 这应该只是一个警告。当query.stop()被调用时,会向流线程发送中断信号,可能会抛出InterruptedException。它不应该使您的代码失败。
    • 我明白了,最后一个问题。我注意到该查询甚至会打印出我在写入之前拥有的 kafka 主题中的行。这不是问题,尽管因此我试图获取在我的数据框中找到的行数。我尝试通过执行q2 = readDF.count().writeStream.format("console").start() 来遵循此答案,但我得到了与以前相同的 AnalysisException。关于如何做到这一点的任何想法?
    • 其实我想我明白了。我制作了另一个类似于 readDF 的 DataFrame,只是我用read 替换了readStream。这让我可以使用count() 方法。
    【解决方案2】:

    Streaming DataFrame 不直接支持 show() 方法,但是有一种方法可以通过让后台线程休眠一段时间并在内存接收器中创建的临时表上使用 show() 函数来查看数据.我可以帮助 pyspark 使用 show() 方法。

    参考我的回答here

    【讨论】:

    • 如果一个问题的答案是指向另一个答案的链接,这可能表明该问题应该被标记为重复。
    猜你喜欢
    • 2022-01-08
    • 2017-12-14
    • 2015-12-31
    • 1970-01-01
    • 2021-12-10
    • 2020-02-08
    • 2011-05-21
    • 2012-05-04
    • 2019-07-21
    相关资源
    最近更新 更多