【问题标题】:How to simultaneously group/apply two Spark DataFrames?如何同时分组/应用两个 Spark DataFrame?
【发布时间】:2019-03-16 03:16:42
【问题描述】:

/* 我认为我的问题与语言无关,但如果重要的话,我会使用 PySpark。 */

情况

我目前有两个 Spark DataFrame:

一个人每分钟心率的每分钟数据(每人每天 1440 行):

 | Person |       date |  time | heartrate |
 |--------+------------+-------+-----------|
 |      1 | 2018-01-01 | 00:00 |        70 |
 |      1 | 2018-01-01 | 00:01 |        72 |
 |    ... |        ... |   ... |       ... |
 |      4 | 2018-10-03 | 11:32 |       123 |
 |    ... |        ... |   ... |       ... |

另外一个 DataFrame 包含每日数据(每人每天 1 行),每日元数据,包括 days 的聚类结果,即人 Y 的第 X 天属于哪个聚类:

| Person |       date | cluster | max_heartrate  |
|--------+------------+---------+----------------|
|      1 | 2018-01-01 |       1 |            180 |
|      1 | 2018-01-02 |       4 |            166 |
|    ... |        ... |     ... |            ... |
|      4 | 2018-10-03 |       1 |            147 |
|    ... |        ... |     ... |            ... |

(请注意,聚类是每个人单独进行的,因此人 1 的聚类 1 与人 2 的聚类 1 无关。)

目标

我现在想计算每个集群和每个人的平均心率,也就是说,每个人都有不同的平均值。如果我有三个集群,我正在寻找这个 DF:

| Person | cluster | mean_heartrate |
|--------+---------+----------------|
| 1      | 1       | 123            |
| 1      | 2       | 89             |
| 1      | 3       | 81             |
| 2      | 1       | 80             |
| ...    | ...     | ...            |

我怎样才能最好地做到这一点?从概念上讲,我想将每个人的这些 两个 DataFrame 分组,并将两个 DF 块发送到一个应用函数中。在那里(即每人),我每天对每日 DF 进行分组和聚合,然后加入每日 DF 的集群 ID,然后计算每个集群的平均值。

但是分组/应用多个 DF 不起作用,对吧?

想法

我有两个想法,但不确定哪个(如果有的话)有意义:

  1. 在分组之前将每日 DF 加入每分钟 DF,这将导致高度冗余的数据(即每分钟复制的集群 ID)。在我的“真实”应用程序中,我可能也会有每个人的数据(例如身高/体重),这将是一个完全恒定的列,即浪费更多的内存。也许这是唯一/最好/公认的方法?

  2. 在应用之前,将 DF 转换为可以容纳复杂结构的 DF,例如喜欢

.

| Person | dataframe  | key              | column    | value |
|--------+------------+------------------+-----------+-------|
|      1 | heartrates | 2018-01-01 00:00 | heartrate |    70 |
|      1 | heartrates | 2018-01-01 00:01 | heartrate |    72 |
|    ... | ...        | ...              | ...       |   ... |
|      1 | clusters   | 2018-01-01       | cluster   |     1 |
|    ... | ...        | ...              | ...       |   ... |

甚至可能

| Person |   JSON |
|--------+--------|
|      1 | { ...} |
|      2 | { ...} |
| ...    | ...    |

这里的最佳做法是什么?

【问题讨论】:

  • 当您说“我现在想计算集群 1 中每个人的平均心率”时,您的意思是要计算集群 1 中所有人的平均心率吗?看来您已经在集群 1 中获得了每个人的平均心率。此外,您是否有理由需要弄乱您的第一个 DataFrame 来计算您想要的内容?看来您需要的所有信息都已经在 DataFrame 2 中了。
  • 我更新了我的问题来澄清一下。我的意思是每个人的每个集群的平均心率,即我希望每个人都有不同的平均值。在这个玩具示例中,您是对的,信息完全包含在第二个 DF 中。我已将列从 mean_heartrate 更改为 max_heartrate,因为在“真实”应用程序中,信息也不包含在第二个 DF 中,它们只是一些额外的元列。

标签: apache-spark pyspark apache-spark-sql pyspark-sql


【解决方案1】:

但是分组/应用多个 DF 不起作用,对吧?

不,AFAIK 这不适用于 pyspark 和 pandas。


  1. 在分组前加入每日 DF 到每分钟 DF...

在我看来,这是要走的路。您不需要合并所有冗余列,只需合并 groupby 操作所需的列。没有办法避免 groupby 列的冗余,因为 groupby 操作需要它们。

在 pandas 中,可以专门提供一个额外的 groupby-column 作为 pandas 系列,但它需要与要分组的数据框具有完全相同的形状。但是,为了创建 groupby-column,无论如何您都需要合并。


  1. 在应用之前,将 DF 转换为可以容纳复杂结构的 DF

在性能和内存方面,我不会采用此解决方案,除非您有多个必需的 groupby 操作,这些操作将从更复杂的数据结构中受益。实际上,您首先需要付出一些努力来实际创建数据结构。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-25
    • 2015-06-14
    • 1970-01-01
    • 2017-06-08
    • 1970-01-01
    • 2016-12-05
    • 1970-01-01
    • 2013-03-03
    相关资源
    最近更新 更多