【发布时间】:2018-06-09 08:27:58
【问题描述】:
所以我的桌子是这样的:
+-------------------+-------+----------+------------+
| trip_id|line_id| ef_ar_ts| station|
+-------------------+-------+----------+------------+
|80:06____:17401:000| 17401| 0|Schaffhausen|
|80:06____:17402:000| 17402|1505278458|Schaffhausen|
|80:06____:17403:000| 17403| 0|Schaffhausen|
|80:06____:17406:000| 17406|1505282110|Schaffhausen|
|80:06____:17409:000| 17409| 0|Schaffhausen|
|80:06____:17410:000| 17410|1505285757|Schaffhausen|
|80:06____:17411:000| 17411| 0|Schaffhausen|
|80:06____:17416:000| 17416|1505292890|Schaffhausen|
|80:06____:17417:000| 17417| 0|Schaffhausen|
|80:06____:17418:000| 17418|1505296501|Schaffhausen|
|80:06____:17419:000| 17419| 0|Schaffhausen|
|80:06____:17420:000| 17420|1505300253|Schaffhausen|
|80:06____:17421:000| 17421| 0|Schaffhausen|
|80:06____:17422:000| 17422|1505303814|Schaffhausen|
|80:06____:17423:000| 17423| 0|Schaffhausen|
|80:06____:17425:000| 17425| 0|Schaffhausen|
|80:06____:17426:000| 17426|1505307355|Schaffhausen|
|80:06____:17427:000| 17427| 0|Schaffhausen|
|80:06____:17428:000| 17428|1505310983|Schaffhausen|
|80:06____:17429:000| 17429| 0|Schaffhausen|
+-------------------+-------+----------+------------+
这是一个火车数据集,我想做的是:
按火车的line_id分组,这样我的所有车站和他们的线路一起;在每个组中按 (ef_ar_ts) 排序;然后按顺序获取 station 的 SET:每个 line_id 一个列表。这样,我将订购我的车站并重建整条线路。
到目前为止我尝试的是这样的:
from pyspark.sql.functions import udf
@functions.udf
def keepline(df):
"""Keep lines splitted;"""
firstline = data1.first().trip_id
dftemp = df.where(data1.trip_id==firstline)
data1 = data1.fillna({'ef_ar_ts':0})
dftemp = dftemp.orderBy('ef_ar_ts')
return mylist
data2 = data1.select('*').groupby(data1.line_id).agg(udfmyfunc)
有什么帮助吗?提前谢谢!
【问题讨论】:
-
感谢您输入正确的格式,我在这里有点新。你碰巧知道这个问题的解决方案吗?
标签: python pyspark pyspark-sql