【问题标题】:How to see a particular metric in Spark Structured Streaming with Python如何使用 Python 在 Spark Structured Streaming 中查看特定指标
【发布时间】:2022-01-20 23:41:04
【问题描述】:

我对 Spark 和 Python 非常陌生。我正在尝试查看 Spark Structured Streaming 中的任何指标(例如,processedRowsPerSecond),但我不知道该怎么做。

我在“Structured Streaming Programming Guide”中读到,使用 print(query.lastProgress) 您可以直接获取活动查询的当前状态和指标,但如果我编写它,我只能获取 @987654323 @ 一次。我的代码的最后一部分如下:

query = windowedCountsDF\
    .writeStream\
    .outputMode('update')\
    .option("truncate", "false") \
    .format('console') \
    .queryName("numbers") \
    .start()

print(query.lastProgress)

query.awaitTermination()

任何关于如何做到这一点的想法都将受到高度赞赏。

【问题讨论】:

    标签: python apache-spark pyspark spark-structured-streaming


    【解决方案1】:

    尝试:

    while query.isActive:
        print("\n")
        print(query.status)
        print(query.lastProgress)
        time.sleep(30)
    

    query.awaitTermination() 阻止 query.lastProgress

    【讨论】:

    • 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center
    • 谢谢亚历克斯。只有一个问题:如果我想访问 lastProgress 中的特定指标(我认为 lastProgress 是一本字典),我必须做什么?
    【解决方案2】:

    这实际上取决于您想对该指标做什么。您的问题是您正在调用query.awaitTermination(),它会阻止任何其他活动。如果您想收集指标,那么您需要实现自己的等待循环,而不是调用query.awaitTermination(),如下所示:

    query = ...
    
    while not query.exception():
      if query.lastProgress:
        print(query.lastProgress) # do something with your data
      time.sleep(10) # wait 10 seconds..
    

    【讨论】:

    • 谢谢亚历克斯。我已经实现了您的代码,但收到以下错误消息:文件“numbers.py”,第 42 行,在 recentProgress = query.recentProgress() TypeError: 'list' object is not callable 21/12/ 18 14:07:32 ERROR streaming.MicroBatchExecution:查询编号 [id = ee2bc067-90c4-4d91-9c5b-dc00388f68b4,runId = 619509e1-4e81-4a78-a553-d75d6c0521a7] 以错误 java.lang.IllegalStateException 终止:无法调用方法在停止的 SparkContext 上。
    • 对不起,这是代码中的错误,只是从文档中复制了一段,忘记了它是类的成员,而不是函数。最好使用lastProgress函数。
    • 对不起,Alex,但它仍然不起作用。我收到此错误消息:21/12/18 15:56:07 ERROR streaming.MicroBatchExecution: Query numbers [id = 0edfdfa9-5cb0-4373-99b7-1a931d6883e0, runId = 83002c13-1260-4317-8043-a025820a514f] 终止于错误 java.lang.IllegalStateException:无法在停止的 SparkContext 上调用方法。
    猜你喜欢
    • 2018-08-09
    • 2020-01-05
    • 2021-06-03
    • 2020-10-29
    • 1970-01-01
    • 2020-03-19
    • 2020-09-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多