【问题标题】:How to return a dictionary in parallel processing in spark?如何在火花中并行处理返回字典?
【发布时间】:2016-02-06 15:45:13
【问题描述】:

我有一个要处理的对象数组:Objects,我有一个函数,它接收字典和一个对象,并返回同一个字典,修改后:

new_dict = modify_object_dict(object_dict, object)

modify_object_dict 执行以下操作:

  • 向字典中添加一个键,即处理对象的名称

  • 创建一个字典作为该键的值(字典中的字典),其中包含添加和删除的元素。

  • 例如,对象可能是一个文件:object_dict['file_name']=sub_dictionary,子字典可能包含sub_dictionary['file_attribute']=attribute

modify_object_dict 填充这些子字典,如上所示,结果是一个包含子字典的字典。

请注意,子词典不相互交互。即一个对象的字典不与另一个对象的字典交互。

我希望使用 spark 并行处理这些对象:

object_dict = {}   # dictionary is initially empty
RDD = (sc.parallelize(Objects)
   .map(lambda object: modify_object_dict(object_dict, object))

这是正确的方法吗?如果不是,那么返回每次调用映射函数时修改的字典的正确方法是什么?

【问题讨论】:

  • 不幸的是,您无法修改驱动程序中存在的变量,因为任务是在执行程序中操作的。 modify_object_dict() 是做什么的?也许,您可以在任务中应用您的处理,然后利用collectAsMap()

标签: python dictionary apache-spark lambda rdd


【解决方案1】:

返回每次调用映射函数时都会修改的字典的正确方法是什么?

简短的回答是没有。由于每个分区都是单独处理的,因此无法创建具有读/写访问权限的共享对象。 Spark 仅支持两种类型的共享变量,累加器和广播,分别具有只写和只读访问权限。

长答案取决于modify_object_dict 内部到底发生了什么。如果您使用的操作是关联的和可交换的并且可以基于键执行(每个对象都可以映射到特定键上的操作),您可以使用aggregateByKey 的一些变体。也可以使用mapPartitions 在本地对数据进行分区和处理。

如果modify_object_dict 不符合上述标准,那么 Spark 在这里很可能不是一个好的选择。可以将状态推送到外部系统,但通常没有意义,除非 Spark 用于繁重的工作并且您推送到外部的只是最终结果。

此外,您不应该使用 map 来获得副作用。这种情况下正确的方法通常是foreach。这里还有一个更微妙的问题。不能保证map(或foreach)对于每个元素只会执行一次。这意味着您执行的每个操作都必须是幂等的。

编辑

根据您的描述,您似乎可以尝试以下方法:

  • 首先让我们创建RDD 一个虚拟类:

    class Foobar(object):
        def __init__(self, name, x=None, y=None, z=None):
            self.name = name
            self.x = x
            self.y = y
            self.z = z
    

    还有一个对象的 RDD:

    objects = sc.parallelize([
        {"name": "foo", "x": 1}, {"name": "foo", "y": 3},
        {"name": "bar", "z": 4}
    ]).map(lambda x: Foobar(**x))
    
  • 接下来让我们将其转换为PairwiseRDD,名称为键,对象为值。如果对象很大,您可以只提取感兴趣的字段并将它们用作值。我假设每个对象都有name 属性。

    pairs = objects.map(lambda obj: (obj.name, obj))
    
  • groupByKey 和转换值:

    rdd = pairs.groupByKey().mapValues(lambda iter: ...)
    

    aggregateByKey(推荐):

    def seq_op(obj_dict, obj):
        # equivalent to modify_object_dict
        # Lets assume it is as simple as this
        obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z"))
        return obj_dict
    
    def comb_op(obj_dict_1, obj_dict_2):
        # lets it is a simple union
        obj_dict_1.update(obj_dict_2)
        return obj_dict_1
    
    dicts = pairs.aggregateByKey({}, seq_op, comb_op)
    
  • 此时你有一个 RDD 对 (name, dict)。它可用于进一步处理,或者如果您确实需要收集为地图的本地结构:

    dicts.collectAsMap()
    ## {'bar': {'x': None, 'y': None, 'z': 4},
    ##     'foo': {'x': None, 'y': 3, 'z': None}}
    

【讨论】:

  • 最后一段是因为分区复制吗?
  • @RohanAletty 不,这只是弹性问题。如果执行器丢失并且集群管理器可以从该任务中恢复,则可以在另一台机器上重新安排。如果数据没有被缓存,那么显然它将在每次访问后代 rdds 时执行,依此类推。您拥有的唯一保证是动作或转换的输出是正确的。
  • 在这种情况下 **x 是什么意思? (我知道 x**y 意味着将 x 提高到 y 次方)
  • @J.Bend 它被称为字典解包操作符。例如,它会解压缩字典以方便合并。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-23
  • 1970-01-01
  • 2019-01-11
  • 1970-01-01
  • 1970-01-01
  • 2015-07-01
相关资源
最近更新 更多