【问题标题】:Pyspark | map JSON rdd and apply broadcast派斯帕克 |映射 JSON rdd 并应用广播
【发布时间】:2023-04-02 10:44:01
【问题描述】:

在 pyspark 中,如何将具有 JSON 的输入 RDD 转换为以下指定的输出,同时将广播变量应用于值列表?

输入

[{'id': 1, 'title': "Foo", 'items': ['a','b','c']}, {'id': 2, 'title': "Bar", 'items': ['a','b','d']}]

广播变量

[('a': 5), ('b': 12), ('c': 42), ('d': 29)]

期望的输出

[(1, 'Foo', [5, 12, 42]), (2, 'Bar', [5, 12, 29])]

【问题讨论】:

    标签: pyspark rdd


    【解决方案1】:

    编辑:最初我的印象是传递给map 函数的函数会自动广播,但在阅读了一些文档后我不再确定这一点。

    无论如何,您都可以定义广播变量:

    bv = [('a', 5), ('b', 12), ('c', 42), ('d', 29)]
    
    # turn into a dictionary
    bv = dict(bv)
    broadcastVar = sc.broadcast(bv)
    print(broadcastVar.value)
    #{'a': 5, 'c': 42, 'b': 12, 'd': 29}
    

    现在是available on all machines as a read-only variable。您可以使用broascastVar.value 访问字典:

    例如:

    import json
    
    rdd = sc.parallelize(
        [
            '{"id": 1, "title": "Foo", "items": ["a","b","c"]}',
            '{"id": 2, "title": "Bar", "items": ["a","b","d"]}'
        ]
    )
    
    def myMapper(row):
        # define the order of the values for your output
        key_order = ["id", "title", "items"]
    
        # load the json string into a dict
        d = json.loads(row)
    
        # replace the items using the broadcast variable dict
        d["items"] = [broadcastVar.value.get(item) for item in d["items"]]
    
        # return the values in order
        return tuple(d[k] for k in key_order)
    
    print(rdd.map(myMapper).collect())
    #[(1, u'Foo', [5, 12, 42]), (2, u'Bar', [5, 12, 29])]
    

    【讨论】:

      猜你喜欢
      • 2021-07-25
      • 1970-01-01
      • 2018-07-30
      • 2017-01-15
      • 1970-01-01
      • 2021-07-07
      • 1970-01-01
      • 1970-01-01
      • 2016-04-11
      相关资源
      最近更新 更多