【问题标题】:Grouping sensor data overtime with Spark Structured Streaming使用 Spark Structured Streaming 对传感器数据进行超时分组
【发布时间】:2021-05-20 04:53:34
【问题描述】:

我们让传感器每天多次启动并随机运行。来自传感器的数据被发送到 Kafka 主题,并由 Spark 结构化流 API 使用并存储到 Delta Lake。现在我们必须识别每个传感器的会话并将其存储在由 device_id 和 sensor_id 分区的不同 Delta Lake 表中。

我尝试使用带有水印的 Spark Structured Streaming,但效果不佳。

stream2 = spark.readStream.format('delta')
             .load('<FIRST_DELTA_LAKE_TABLE>')
             .select('device_id', 'json', 'time')
             .withWatermark('timestamp', '10 minutes')
             .groupBy('device_id').agg(F.min('time').alias('min_time'), F.max('time').alias('max_time')))
             .writeStream
             .format("delta")
stream2.start("<SESSIONS_TABLE>")

我们的想法是让第二个表从传入数据中识别会话并保存每个会话和设备的开始时间和结束时间。流作业运行,没有任何内容写入 Sessions 增量表。

我们将不胜感激。

【问题讨论】:

  • 实际上他们在 2 个独立的流媒体作业中。第一个流式作业只是将原始数据写入 Delta 表,这是从该 Delta 表读取数据的第二个流式作业。

标签: databricks azure-databricks spark-structured-streaming spark-streaming-kafka


【解决方案1】:

默认情况下,当您编写流时,它默认使用append 模式(请参阅doc)。而在这种模式下,当你使用水印时,只有越过水印后才会输出数据,所以至少要延迟10分钟才能开始看到输出中的数据。

但我认为主要问题是您正在运行“全局”聚合,没有定义窗口或类似的东西。通常人们使用flatMapGroupWithState进行会话检测,类似于下面的blog post

【讨论】:

  • 我正在使用 pyspark。 python 支持 flatMapGroupWithState 吗?
  • 啊,没注意到那是 Python……不,它只在 Java/Scala 中可用
  • 我在批处理模式下通过使用 group by 和延迟时间戳解决了这个问题。有没有办法找到火花流设备的连续消息之间的时间差?
  • 你也可以按窗口分区,然后在那里搜索...但是如果传感器随时可能发送数据,那么窗口大小可能不是最好的方法
猜你喜欢
  • 2021-02-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-06
  • 1970-01-01
  • 2020-03-19
  • 2013-06-21
  • 1970-01-01
相关资源
最近更新 更多