【问题标题】:Pyspark - Selecting Distinct Values in Column after groupby and orderByPyspark - 在 groupby 和 orderBy 之后在列中选择不同的值
【发布时间】:2018-06-09 08:27:58
【问题描述】:

所以我的桌子是这样的:

+-------------------+-------+----------+------------+
|            trip_id|line_id|  ef_ar_ts|     station|
+-------------------+-------+----------+------------+
|80:06____:17401:000|  17401|         0|Schaffhausen|
|80:06____:17402:000|  17402|1505278458|Schaffhausen|
|80:06____:17403:000|  17403|         0|Schaffhausen|
|80:06____:17406:000|  17406|1505282110|Schaffhausen|
|80:06____:17409:000|  17409|         0|Schaffhausen|
|80:06____:17410:000|  17410|1505285757|Schaffhausen|
|80:06____:17411:000|  17411|         0|Schaffhausen|
|80:06____:17416:000|  17416|1505292890|Schaffhausen|
|80:06____:17417:000|  17417|         0|Schaffhausen|
|80:06____:17418:000|  17418|1505296501|Schaffhausen|
|80:06____:17419:000|  17419|         0|Schaffhausen|
|80:06____:17420:000|  17420|1505300253|Schaffhausen|
|80:06____:17421:000|  17421|         0|Schaffhausen|
|80:06____:17422:000|  17422|1505303814|Schaffhausen|
|80:06____:17423:000|  17423|         0|Schaffhausen|
|80:06____:17425:000|  17425|         0|Schaffhausen|
|80:06____:17426:000|  17426|1505307355|Schaffhausen|
|80:06____:17427:000|  17427|         0|Schaffhausen|
|80:06____:17428:000|  17428|1505310983|Schaffhausen|
|80:06____:17429:000|  17429|         0|Schaffhausen|
+-------------------+-------+----------+------------+

这是一个火车数据集,我想做的是:

按火车的line_id分组,这样我的所有车站和他们的线路一起;在每个组中按 (ef_ar_ts) 排序;然后按顺序获取 station 的 SET:每个 line_id 一个列表。这样,我将订购我的车站并重建整条线路。

到目前为止我尝试的是这样的:

from pyspark.sql.functions import udf
@functions.udf
def keepline(df):
    """Keep lines splitted;"""
    firstline = data1.first().trip_id

dftemp = df.where(data1.trip_id==firstline)

data1 = data1.fillna({'ef_ar_ts':0})

dftemp = dftemp.orderBy('ef_ar_ts')



return mylist

data2 = data1.select('*').groupby(data1.line_id).agg(udfmyfunc)

有什么帮助吗?提前谢谢!

【问题讨论】:

  • 感谢您输入正确的格式,我在这里有点新。你碰巧知道这个问题的解决方案吗?

标签: python pyspark pyspark-sql


【解决方案1】:

我们可以按 line_id 进行分组,收集 ef_ar_tsstation 列并使用 UDF 对这两个集合进行排序。希望这会有所帮助。

由于您的数据框在站中具有相同的值,因此我添加了两行带有 dummystation 的行以供参考,

+-------------------+-------+----------+-------------+
|            trip_id|line_id|  ef_ar_ts|      station|
+-------------------+-------+----------+-------------+
|80:06____:17401:000|  17401|         0| Schaffhausen|
|80:06____:17402:000|  17402|1505278458| Schaffhausen|
|80:06____:17403:000|  17403|         0| Schaffhausen|
......................................................
......................................................
|80:06____:17427:000|  17427|         0| Schaffhausen|
|80:06____:17428:000|  17428|1505310983| Schaffhausen|
|80:06____:17429:000|  17429|         0| Schaffhausen|
|80:06____:17429:000|  17401|1505278478|dummystation1|
|80:06____:17429:000|  17429|1505307355|dummystation2|
+-------------------+-------+----------+-------------+

## group and collect for each line id ##
df1 = df.groupby('line_id').agg(F.collect_list('ef_ar_ts').alias('ef_ar_ts'),F.collect_list('station').alias('station'))

+-------+---------------+-----------------------------+
|line_id|ef_ar_ts       |station                      |
+-------+---------------+-----------------------------+
|17419  |[0]            |[Schaffhausen]               |
|17420  |[1505300253]   |[Schaffhausen]               |
|17403  |[0]            |[Schaffhausen]               |
|17406  |[1505282110]   |[Schaffhausen]               |
|17428  |[1505310983]   |[Schaffhausen]               |
|17421  |[0]            |[Schaffhausen]               |
|17427  |[0]            |[Schaffhausen]               |
|17411  |[0]            |[Schaffhausen]               |
|17416  |[1505292890]   |[Schaffhausen]               |
|17429  |[0, 1505307355]|[Schaffhausen, dummystation2]|
|17401  |[0, 1505278478]|[Schaffhausen, dummystation1]|
|17423  |[0]            |[Schaffhausen]               |
|17417  |[0]            |[Schaffhausen]               |
|17402  |[1505278458]   |[Schaffhausen]               |
|17418  |[1505296501]   |[Schaffhausen]               |
|17425  |[0]            |[Schaffhausen]               |
|17409  |[0]            |[Schaffhausen]               |
|17422  |[1505303814]   |[Schaffhausen]               |
|17426  |[1505307355]   |[Schaffhausen]               |
|17410  |[1505285757]   |[Schaffhausen]               |
+-------+---------------+-----------------------------+

## an UDF for merge both collections and sort them ##
from operator import itemgetter
udf1 = F.udf(lambda x,y : [st[1] for st in sorted(zip(x,y),key=itemgetter(0))])
df1.select('line_id',udf1('ef_ar_ts','station').alias('stations')).show(truncate=False)

+-------+-----------------------------+
|line_id|stations                     |
+-------+-----------------------------+
|17419  |[Schaffhausen]               |
|17420  |[Schaffhausen]               |
|17403  |[Schaffhausen]               |
|17406  |[Schaffhausen]               |
|17428  |[Schaffhausen]               |
|17421  |[Schaffhausen]               |
|17427  |[Schaffhausen]               |
|17411  |[Schaffhausen]               |
|17416  |[Schaffhausen]               |
|17429  |[Schaffhausen, dummystation2]|
|17401  |[Schaffhausen, dummystation1]|
|17423  |[Schaffhausen]               |
|17417  |[Schaffhausen]               |
|17402  |[Schaffhausen]               |
|17418  |[Schaffhausen]               |
|17425  |[Schaffhausen]               |
|17409  |[Schaffhausen]               |
|17422  |[Schaffhausen]               |
|17426  |[Schaffhausen]               |
|17410  |[Schaffhausen]               |
+-------+-----------------------------+

【讨论】:

  • 我还没有检查它是否适用于我的完整数据集。但它似乎是当场的。感激不尽!
猜你喜欢
  • 1970-01-01
  • 2020-06-29
  • 1970-01-01
  • 1970-01-01
  • 2018-09-09
  • 2020-12-19
  • 1970-01-01
  • 2016-02-04
  • 2018-07-28
相关资源
最近更新 更多