【发布时间】:2019-01-21 00:23:14
【问题描述】:
我是 pyspark 的新手,到目前为止,当您习惯使用 pandas 等库时,很难理解它的工作方式。但这似乎是大数据的必经之路。
对于我目前的 ETL 工作,我有以下要素:
这是我的rdd:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
],
[
('SMSG', 'BKT'), ('SQNR', '00000024'), ('STNQ', '06'), ('TRNN', '000002'), ('NRID', ' '), ('TREC', '020'), ('TRNN', '000002'), ('NRID', ' '), ('TACN', '001'), ('CARF', ' '), ...
],
...
]
行数据是一个固定大小的文本文件。
我现在要做的是对列表的每个单元格进行分组。
最终结果应该是:
[
[
('SMSG_1', 'BKT'),('SMSG_2','BKS'),('SQNR_1', '00000004'),('SQNR_2', '00000005'),('STNQ_1','06'),('STNQ_2','24'),('TRNN', '000001'),()('DAIS', '171231'),...
],
[
('SMSG', 'BKT'),('SQNR', '00000024'),('STNQ','06'),('TRNN', '000002'),('NRID', ' '), ('TREC', '020'), ('TACN', '001'), ('CARF', ' '),...
],
...
]
基本上规则如下:
1- 如果键相同且值也相同,则删除重复项。
2- 如果键相同而值不同,重命名列并添加后缀为“_Number”,其中 Number 可以替换为该键的迭代次数。
我的代码开始如下:
def addBKT():
...
def prepareTrans():
...
if __name__ == '__main__':
input_folder = '/Users/admin/Documents/Training/FR20180101HOT'
rdd = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
rdd = rdd.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
print(rdd.take(1))
打印件给了我(如前所述)以下元组列表。我只取了 1 个子列表,但完整的 rdd 有大约 2000 个元组子列表:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
]
]
我首先尝试将嵌套列表减少如下:
rdd = rdd.flatMap(lambda x:x).reduceByKey(list)
我期待结果是一个没有重复的新列表列表,对于具有不同值的元组,将它们全部分组在同一个键下。但是,我无法做到这一点。
作为第二步,我计划将具有多个值的元组转换为新的元组对,就像我在分组元组中获得的值一样:即 ('Key', ['Value1', 'Value2']) 变为 ( 'Key_1', 'Value1'),('Key_2', 'Value2')
最后,所有这些转换的输出是将最终的 RDD 转换为 DataFrame 并以 parquet 格式存储。
我真的希望过去有人做过类似的事情。我花了很多时间尝试这样做,但我无法做到,也无法在网上找到任何示例。
感谢您的帮助。
【问题讨论】:
标签: group-by pyspark rdd reduce key-pair