【发布时间】:2018-03-14 01:10:43
【问题描述】:
我正在使用Spark structured streaming 处理从Kafka 读取的记录。这是我想要实现的目标:
(a) 每条记录都是Tuple2 类型的(Timestamp, DeviceId)。
(b) 我创建了一个静态的Dataset[DeviceId],其中包含预期会在Kafka 流中看到的所有有效设备 ID(DeviceId 类型)的集合。
(c) 我需要写一个Spark structured streaming 查询
(i) Groups records by their timestamp into 5-minute windows
(ii) For each window, get the list of valid device IDs that were **not** seen in that window
例如,假设所有有效设备 ID 的列表为[A,B,C,D,E],并且某个 5 分钟窗口中的 kafka 记录包含设备 ID [A,B,E]。然后,对于该窗口,我要查找的未查看设备 ID 列表是 [C,D]。
问题
- 如何在 Spark 结构化流中编写此查询?我尝试使用
Dataset公开的except()和join()方法。然而,他们都抛出了一个运行时异常,抱怨streaming Dataset不支持这些操作。
这是我的代码的 sn-p:
val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L)))
case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)
// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
.withWatermark("timestamp", "5 minutes")
.groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
.count()
.map(row => (row.getLong(0), 1L))
.as[(Long, Long)]
val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
.filter(row => row.isNullAt(1))
.map(row => row.getLong(0))
最后一条语句抛出以下异常:
Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;
提前致谢。
【问题讨论】:
-
静态数据集有多大?
-
可能有几万个设备ID(例如:10K - 50K)
-
在 spark 中使用 leftAnti 或 rightAnti 加入
标签: scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming