【问题标题】:How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState?如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?
【发布时间】:2018-05-05 12:46:40
【问题描述】:

如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找一种更具声明性的方式

例子:

select count(*) from some_view

我希望输出只计算每批中可用的任何记录,但不计算前一批的汇总

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    要在 spark 中使用 Structured Streaming 2.3.0 而不使用 flatMapsGroupWithState 或 Dstream API 进行无状态聚合,您可以使用以下代码-

    import spark.implicits._
    
    def countValues = (_: String, it: Iterator[(String, String)]) => it.length
    
    val query =
      dataStream
        .select(lit("a").as("newKey"), col("value"))
        .as[(String, String)]
        .groupByKey { case(newKey, _) => newKey }
        .mapGroups[Int](countValues)
        .writeStream
        .format("console")
        .start()
    

    我们正在做的是-

    1. 我们在datastream - newKey 中添加了一列。我们这样做是为了可以使用groupByKey 对其进行groupBy。我使用了文字字符串"a",但你可以使用任何东西。此外,您需要从datastream 的可用列中选择任何列。我为此选择了value 列,您可以选择任何人。
    2. 我们创建了一个映射函数 - countValues,通过编写 it.length 来计算 groupByKey 函数聚合的值。

    因此,通过这种方式,我们可以计算每批中可用的任何记录,但不是从前一批中汇总的记录。

    希望对你有帮助!

    【讨论】:

    • 我正在寻找问题中所述的声明方式,因此我尝试使用原始 sql 字符串解决问题,这隐含意味着没有映射函数,除非它们可以用作原始 SQL 的一部分!
    • @user1870400 我不熟悉任何声明方式。
    • 如果你选择文字“a”,整个流不会变成一个组吗?
    • @user1870400 是的。
    • 有没有办法在 mapGroups 中创建一个静态数据框?鉴于 mapGroups 给出了行的迭代器。我只想使用该迭代器并填充一个静态数据框。这可能吗?
    猜你喜欢
    • 1970-01-01
    • 2021-02-12
    • 2017-04-22
    • 2012-01-26
    • 2022-12-10
    • 2023-01-20
    • 2020-02-15
    • 2020-12-28
    • 1970-01-01
    相关资源
    最近更新 更多