【问题标题】:Apply a custom function to a spark dataframe group将自定义函数应用于 spark 数据框组
【发布时间】:2017-01-28 17:40:24
【问题描述】:

我有一个非常大的包含这些列的时间序列数据表:

  • 时间戳
  • 车牌
  • UberRide#
  • 速度

LicensePlate/UberRide 数据的每个集合都应考虑整个数据集进行处理。换句话说,我不需要逐行处理数据,而是按 (LicensePlate/UberRide) 分组的所有行一起处理。

我计划将 spark 与 dataframe api 一起使用,但我对如何对 spark 分组数据帧执行自定义计算感到困惑。

我需要做的是:

  1. 获取所有数据
  2. 按某些列分组
  3. Foreach 火花数据帧组应用 f(x)。为每个组返回一个自定义对象
  4. 通过应用 g(x) 并返回单个自定义对象来获取结果

如何执行第 3 步和第 4 步?关于我应该使用哪个 spark API(dataframe、dataset、rdd、也许是 pandas...)的任何提示?

整个工作流程如下:

【问题讨论】:

  • Pandas 不是 Spark 的一部分,您可以使用 DataFrame 但您将使用 have to do it in Scalaadd Python wrapper,RDD 应该可以正常工作。
  • 不能直接使用Spark吗?我正在使用 Spark 1.6.2
  • 如果你指的是 PySpark,那么就像我说的那样 - RDD 应该可以正常工作。
  • 在向新手提问之前,我会进一步调查究竟如何使用 rdds... :-)
  • @NischalHp : df.rdd.keyBy(lambda x: (x['key1'], x['key2'])) \ .groupByKey() \ .map(lambda groupped_data: my_map_fn(分组数据))

标签: apache-spark dataframe group-by dataset pyspark


【解决方案1】:
  • 虽然 Spark 提供了一些与 Pandas 集成的方法,但它并没有使 Pandas 成为分布式的。因此,无论您在 Spark 中使用 Pandas 做什么都是简单的本地操作(在转换中使用时对驱动程序或执行程序)。

    如果您正在寻找具有类似 Pandas API 的分布式系统,您应该查看dask

  • You can define User Defined Aggregate functions or Aggregators 处理分组 Datasets 但 API 的这一部分只能在 Scala 中直接访问。创建一个时write a Python wrapper 并不难。
  • RDD API 提供了许多函数,可用于分组执行操作,从低级 repartition / repartitionAndSortWithinPartitions 开始,以许多 *byKey 方法结束(combineByKeygroupByKey、@ 987654334@等)。

    哪一个适用于您的情况取决于您要应用的函数的属性(它是否具有关联性和可交换性,是否可以在流上工作,是否需要特定的顺序)。

    最通用但效率低的方法可以总结如下:

    h(rdd.keyBy(f).groupByKey().mapValues(g).collect())
    

    其中f 从值映射到keyg 对应于每组聚合,h 是最终合并。大多数时候你可以做得比这更好,所以它应该只作为最后的手段。

  • 相对复杂的逻辑可以用DataFrames/Spark SQL和window functions来表达。

  • 另见Applying UDFs on GroupedData in PySpark (with functioning python example)

【讨论】:

    【解决方案2】:

    您正在寻找的东西自 Spark 2.3 以来就存在:Pandas 矢量化 UDF。它允许对 DataFrame 进行分组并使用 pandas 应用自定义转换,分布在每个组中:

    df.groupBy("groupColumn").apply(myCustomPandasTransformation)
    

    它很容易使用,所以我会输入a link to Databricks' presentation of pandas UDF

    但是,我还不知道在 Scala 中进行分组转换的实用方法,因此欢迎提供任何其他建议。

    编辑:在 Scala 中,您可以使用 Dataset 的 groupByKey + mapGroups/flatMapGroups 实现与 Spark 早期版本相同的功能。

    【讨论】:

      猜你喜欢
      • 2023-03-06
      • 2018-03-03
      • 2020-10-28
      • 1970-01-01
      • 2021-09-20
      • 1970-01-01
      • 2018-12-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多