【问题标题】:Aggregate JSON files in Spark RDD在 Spark RDD 中聚合 JSON 文件
【发布时间】:2020-08-01 05:32:22
【问题描述】:

我有一系列类似这样的文件:

[
 {
  'id':1,
  'transactions': [
   {
    'date': '2019-01-01',
    'amount': 50.50
   },
   {
    'date': '2019-01-02',
    'amount': 10.20
   },
  ]
 },
 {
  'id':2,
  'transactions': [
   {
    'date': '2019-01-01',
    'amount': 10.20
   },
   {
    'date': '2019-01-02',
    'amount': 0.50
   },
  ]
 }
]

我使用以下代码将这些文件加载​​到 Spark

users= spark.read.option("multiline", "true").json(file_location)

结果是一个包含两列 idtransactions 的 SparkData 帧,其中 transactions 是一个 StructType。

我希望能够为每个用户“映射”transactions 以聚合它们。

目前我正在使用 rdd 和一个看起来像这样的函数:

users.rdd.map(lambda a: summarize_transactions(a.transactions))

summary 函数可以有两种类型:
a) 将对象列表转换为 Pandas Dataframe 进行汇总。
b) 遍历对象列表以对其进行总结。

但是我发现a.transactionspyspark.sql.types.Row 的列表。而不是实际的字典。

1) 这是实现我的目标的最佳方式吗?
2) 如何将 Spark Rows 列表变成字典的原始列表?

【问题讨论】:

    标签: json apache-spark pyspark rdd


    【解决方案1】:

    我找到了解决自己问题的方法:

    第 1 步:将数据加载为文本文件:
    step1= sc.textFile(file_location)

    第 2 步:读取为 JSON 和平面图

    import json
    step2 = step1.map(lambda a: json.loads(a)).flatMap(lambda a: a)
    

    第 3 步:减少键映射

    setp3 = (
     step2
     .map(lambda line: [line['id'], line['transactions']])
     .reduceByKey(lambda a, b: a + b)
     .mapValues(lambda a: summarize_transactions(a))
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-04-07
      • 2017-07-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-18
      • 2015-07-14
      • 1970-01-01
      相关资源
      最近更新 更多