【问题标题】:Spark structured streaming - join static dataset with streaming datasetSpark结构化流 - 将静态数据集与流数据集连接
【发布时间】:2018-03-14 01:10:43
【问题描述】:

我正在使用Spark structured streaming 处理从Kafka 读取的记录。这是我想要实现的目标:

(a) 每条记录都是Tuple2 类型的(Timestamp, DeviceId)

(b) 我创建了一个静态的Dataset[DeviceId],其中包含预期会在Kafka 流中看到的所有有效设备 ID(DeviceId 类型)的集合。

(c) 我需要写一个Spark structured streaming 查询

 (i) Groups records by their timestamp into 5-minute windows
 (ii) For each window, get the list of valid device IDs that were **not** seen in that window

例如,假设所有有效设备 ID 的列表为[A,B,C,D,E],并且某个 5 分钟窗口中的 kafka 记录包含设备 ID [A,B,E]。然后,对于该窗口,我要查找的未查看设备 ID 列表是 [C,D]

问题

  1. 如何在 Spark 结构化流中编写此查询?我尝试使用Dataset 公开的except()join() 方法。然而,他们都抛出了一个运行时异常,抱怨streaming Dataset 不支持这些操作。

这是我的代码的 sn-p:

val validDeviceIds: Dataset[(DeviceId, Long)] = spark.createDataset[DeviceId](listOfAllDeviceIds.map(id => (id, 0L))) 

case class KafkaRecord(timestamp: TimestampType, deviceId: DeviceId)

// kafkaRecs is the data stream from Kafka - type is Dataset[KafkaRecord]
val deviceIdsSeen = kafkaRecs
     .withWatermark("timestamp", "5 minutes")
     .groupBy(window($"timestamp", "5 minutes", "5 minutes"), $"deviceId")
     .count()
     .map(row => (row.getLong(0), 1L))
     .as[(Long, Long)]

val unseenIds = deviceIdsSeen.join(validDeviceIds, Seq("_1"), "right_outer")
     .filter(row => row.isNullAt(1))
     .map(row => row.getLong(0))

最后一条语句抛出以下异常:

Caused by: org.apache.spark.sql.AnalysisException: Right outer join with a streaming DataFrame/Dataset on the left is not supported;;

提前致谢。

【问题讨论】:

  • 静态数据集有多大?
  • 可能有几万个设备ID(例如:10K - 50K)
  • 在 spark 中使用 leftAnti 或 rightAnti 加入

标签: scala apache-spark apache-spark-sql apache-spark-dataset spark-structured-streaming


【解决方案1】:

外部连接与另一侧的流数据集 are just not supported:

  • 有条件地支持流数据集和静态数据集之间的外连接。
    • 不支持与流数据集的完全外部联接
    • 不支持与右侧流数据集的左外连接
    • 不支持与左侧流数据集的右外连接

如果其他Dataset比较小,可以使用Map或者类似的结构broadcast,在UserDefinedFunction里面引用。

val map: Broadcast[Map[T, U]] = ???
val lookup = udf((x: T) => map.value.get(x))

df.withColumn("foo", lookup($"_1"))

【讨论】:

    【解决方案2】:

    spark 结构化流中join operations 的情况如下:流DataFrames 可以与static DataFrames 结合,因此进一步创建新的streaming DataFrames。但是在streamingstatic Datasets 之间的outer joins有条件地支持的,而right/left joinsstreaming Dataset 通常不支持结构化流。结果,您遇到了AnalysisException,这是在您尝试将静态数据集与流数据集连接时抛出的。作为我的话的证明,您可以查看 spark 的源代码,在此 line 抛出异常表示不支持您尝试的操作。

    我尝试使用静态DataFramesstream of DataFrames 进行连接操作。

    val streamingDf = sparkSession
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "127.0.0.1:9092")
        .option("subscribe", "structured_topic")
        .load()
    
    val lines = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", 9999)
          .load()
    
    val staticDf = Seq((1507831462 , 100)).toDF("Timestamp", "DeviceId")
    
    //Inner Join
    streamingDf.join(staticDf, "Timestamp")
    line.join(staticDf, "Timestamp")
    
    //Left Join
    streamingDf.join(staticDf, "Timestamp", "left_join")
    line.join(staticDf, "Timestamp", "left_join")
    

    如您所见,除了使用来自Kafka 的数据外,我还从通过nc (netcat) 启动的套接字读取数据,它显着简化了您在测试流应用程序时的生活。 这种方法对我来说都很好,Kafkasocket 作为数据源。

    希望有所帮助。

    【讨论】:

    • 感谢您的回复。但是,我认为这不能解决我的用例。我需要找到存在于静态数据集中但不存在于流数据集中的设备 ID。如果我对左侧的流数据集进行左连接,我不会得到,对吧?除了连接之外,还有其他方法可以在 Spark 结构化流中实现此逻辑吗?再次感谢。
    • 根据 Spark 规范 - 您可以使用 structured streamingstatic dataframe 进行左外连接,但不能使用 dataset,尝试将 dataframe 转换为 dataset 和 moke 连接操作 - @ 987654323@
    猜你喜欢
    • 2018-12-16
    • 2018-09-15
    • 2018-04-19
    • 2018-09-25
    • 2018-07-30
    • 2019-03-30
    相关资源
    最近更新 更多