【问题标题】:Multi line processing Spark structured streaming多行处理 Spark 结构化流
【发布时间】:2019-12-26 20:19:53
【问题描述】:

我想使用 Spark 结构化流处理多行数据集。示例数据集如下所示

{"reqID":"id3", "time":1577085247000, "type":"start"}                                 
{"reqID":"id3", "time":1577085250000, "type":"sysstart"}                                 
{"reqID":"id3", "time":1577085256000, "type":"sysend"}                                 
{"reqID":"id3", "time":1577085260000, "type":"end"}                                 
{"reqID":"id4", "time":1577085263000, "type":"start"}                                 
{"reqID":"id4", "time":1577085266000, "type":"sysstart"}                                 
{"reqID":"id4", "time":1577085269000, "type":"sysend"}                                 
{"reqID":"id4", "time":1577085278000, "type":"end"}

我想基于reqID执行end_time(time for type end) - start_time(type for type start)之类的操作。

我已尝试按reqID 进行分组,我能够在聚合期间合并事件,但合并的事件在ArrayType 中,我无法执行所需的操作。

作为另一种方法,我尝试过旋转,但它不适用于流式传输。它仅适用于批处理。

这种情况的解决方案是什么?

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    您可以使用 where 和 join 条件,选择您想要的片段,加入它们并根据需要进行操作:

    df.where($"type" === "start")
      .drop("type")
      .withColumnRenamed("time", "startTime")
      .join(df.where($"type" === "end")
              .drop("type")
              .withColumnRenamed("time", "endTime"), "reqID")
      .withColumn("result", $"endTime" - $"startTime")
    

    输出

    +-----+-------------+-------------+------+
    |reqID|    startTime|      endTime|result|
    +-----+-------------+-------------+------+
    |  id3|1577085247000|1577085260000| 13000|
    |  id4|1577085263000|1577085278000| 15000|
    +-----+-------------+-------------+------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-24
      • 1970-01-01
      • 2019-12-10
      • 2019-10-03
      • 2021-12-20
      • 1970-01-01
      • 2022-08-12
      • 2018-02-17
      相关资源
      最近更新 更多