【发布时间】:2021-05-20 04:53:34
【问题描述】:
我们让传感器每天多次启动并随机运行。来自传感器的数据被发送到 Kafka 主题,并由 Spark 结构化流 API 使用并存储到 Delta Lake。现在我们必须识别每个传感器的会话并将其存储在由 device_id 和 sensor_id 分区的不同 Delta Lake 表中。
我尝试使用带有水印的 Spark Structured Streaming,但效果不佳。
stream2 = spark.readStream.format('delta')
.load('<FIRST_DELTA_LAKE_TABLE>')
.select('device_id', 'json', 'time')
.withWatermark('timestamp', '10 minutes')
.groupBy('device_id').agg(F.min('time').alias('min_time'), F.max('time').alias('max_time')))
.writeStream
.format("delta")
stream2.start("<SESSIONS_TABLE>")
我们的想法是让第二个表从传入数据中识别会话并保存每个会话和设备的开始时间和结束时间。流作业运行,没有任何内容写入 Sessions 增量表。
我们将不胜感激。
【问题讨论】:
-
实际上他们在 2 个独立的流媒体作业中。第一个流式作业只是将原始数据写入 Delta 表,这是从该 Delta 表读取数据的第二个流式作业。
标签: databricks azure-databricks spark-structured-streaming spark-streaming-kafka