【发布时间】:2019-09-09 16:30:12
【问题描述】:
让我们假设以下设置:我有一个事件流。我想要一些特定的事件来触发一个动作。具体情况可能是:客户订单流,如果订单满足某些条件,我想向客户发送通知/短信。同时,我想跟踪我处理消息的速度并监控哪个订单满足哪个条件。
对于通知,我使用由几个操作组成的 Spark Structured Streaming 代码:
df_orders = spark.readStream.format("eventhubs").options(**conf).load()
(df_orders
.filter(col('sms_consent') == True)
.filter(col('order_price') > 1000)
.dropDuplicates(['order_id', 'customer_id'])
.writeStream
.format('eventhubs')
.options(**conf)
.start()
)
现在我想构建一个“监控/报告”解决方案,它将为每个传入的订单导出以下数据:
+----------+-----------------------+-----------------------+-----------------------+--------------------------+----------------------+
| order_id | filtered_sms_consent | filtered_order_price | time_messageReceived | time_processingFinished | time_sentToEventHub |
+----------+-----------------------+-----------------------+-----------------------+--------------------------+----------------------+
| 1 | True | None | 9:40:00 | 9:41:00 | None |
| 2 | False | False | 9:41:00 | 9:42:00 | 9:42:21 |
| 3 | False | True | 9:43:00 | 9:45:00 | None |
+----------+-----------------------+-----------------------+-----------------------+--------------------------+----------------------+
(形状无关紧要 - 表格可以转为更“类似日志”的结构...)
我的实验:
首先,我考虑过使用 Spark 监听器 (StreamingQueryListener),因为监听器似乎能够记录查询状态、平均处理时间等内容。 .但是我找不到任何解决方案来匹配某些事件(order_id)与来自查询侦听器的数据。
接下来,我写了一个单独的查询用于监控,同时保留查询以供实际的逻辑执行。问题是,由于这是两个单独的查询,每个查询都是独立执行的。因此,时间戳已关闭。我设法使用foreachBatch() approach 将它们绑定在一起。然而,这确实遇到了dropDuplicates 的问题(必须将查询分成两部分),并且感觉非常“沉重”(它大大减慢了执行速度)。
梦想:
我想要的是这样的:
(df_orders
.log('order_id {}: Processing started at {time}'.format(col('order_id'), time.now())
.filter(col('sms_consent') == True)
.log('order_id {}: filtered on sms_consent'.format(col('order_id'))
.filter(col('order_price' > 1000)
.log('order_id {}: filtered on sms_price'.format(col('order_id'))
...
)
或者默认情况下在 spark 日志中包含这些信息,并且可以提取它。
这是如何实现的?
【问题讨论】:
标签: apache-spark pyspark spark-structured-streaming azure-monitoring