【问题标题】:Spark Structured Streaming - Log the internal progress of a querySpark Structured Streaming - 记录查询的内部进度
【发布时间】:2019-09-09 16:30:12
【问题描述】:

让我们假设以下设置:我有一个事件流。我想要一些特定的事件来触发一个动作。具体情况可能是:客户订单流,如果订单满足某些条件,我想向客户发送通知/短信。同时,我想跟踪我处理消息的速度并监控哪个订单满足哪个条件。

对于通知,我使用由几个操作组成的 Spark Structured Streaming 代码:

df_orders = spark.readStream.format("eventhubs").options(**conf).load()

(df_orders
.filter(col('sms_consent') == True)
.filter(col('order_price') > 1000)
.dropDuplicates(['order_id', 'customer_id'])
.writeStream
.format('eventhubs')
.options(**conf)
.start()
)

现在我想构建一个“监控/报告”解决方案,它将为每个传入的订单导出以下数据:


+----------+-----------------------+-----------------------+-----------------------+--------------------------+----------------------+
| order_id |  filtered_sms_consent |  filtered_order_price |  time_messageReceived |  time_processingFinished |  time_sentToEventHub |
+----------+-----------------------+-----------------------+-----------------------+--------------------------+----------------------+
|        1 |  True                 |  None                 |  9:40:00              |  9:41:00                 |  None                |
|        2 |  False                |  False                |  9:41:00              |  9:42:00                 |  9:42:21             |
|        3 |  False                |  True                 |  9:43:00              |  9:45:00                 |  None                |
+----------+-----------------------+-----------------------+-----------------------+--------------------------+----------------------+


(形状无关紧要 - 表格可以转为更“类似日志”的结构...)

我的实验:

首先,我考虑过使用 Spark 监听器 (StreamingQueryListener),因为监听器似乎能够记录查询状态、平均处理时间等内容。 .但是我找不到任何解决方案来匹配某些事件(order_id)与来自查询侦听器的数据。

接下来,我写了一个单独的查询用于监控,同时保留查询以供实际的逻辑执行。问题是,由于这是两个单独的查询,每个查询都是独立执行的。因此,时间戳已关闭。我设法使用foreachBatch() approach 将它们绑定在一起。然而,这确实遇到了dropDuplicates 的问题(必须将查询分成两部分),并且感觉非常“沉重”(它大大减慢了执行速度)。

梦想:

我想要的是这样的:

(df_orders
.log('order_id {}: Processing started at {time}'.format(col('order_id'), time.now()) 
.filter(col('sms_consent') == True)
.log('order_id {}: filtered on sms_consent'.format(col('order_id'))
.filter(col('order_price' > 1000)
.log('order_id {}: filtered on sms_price'.format(col('order_id'))
...
)

或者默认情况下在 spark 日志中包含这些信息,并且可以提取它。

这是如何实现的?

【问题讨论】:

    标签: apache-spark pyspark spark-structured-streaming azure-monitoring


    【解决方案1】:
    1. 您可以创建 UDF 以将日志发送到所需的存储,并在流式传输期间调用它并从每个工作人员发送数据。它可能很慢。
    2. 您可以创建用于记录到标准 Spark 日志的 UDF。对于查看日志,您需要从所有节点收集日志。我使用 logstash 从所有节点收集本地日志,并使用 kibana 作为仪表板。
    3. 如果您需要日志时间序列数据,您可以使用 spark 指标系统 https://spark.apache.org/docs/latest/monitoring.html#metricshttps://github.com/groupon/spark-metrics 自定义指标。这将允许您 在流式传输期间创建 UDF 并发送自定义指标。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-03-19
      • 2021-03-06
      • 2021-02-12
      • 2020-09-12
      • 2021-06-03
      • 2021-04-13
      • 2022-01-05
      相关资源
      最近更新 更多