【问题标题】:Filter an rdd depending on values of a second rdd根据第二个 rdd 的值过滤 rdd
【发布时间】:2020-12-13 02:08:18
【问题描述】:

我有两个 rdd,我想用另一个的值过滤一个。

每个rdd的几个实例如下:

rdd1 = [((address1, date1),1), ((address5, date2),1), ((address1, date2),1), ((address2,date3),1)]
rdd2 = [(address1,1), (address1,1), (address2, 1), (address1, 1)]

期望的输出是:

joined_rdd = [((address1, date1),1),((address1, date2),1),((address2,date3),1)]

所以基本上我想将元组保留在 rdd1 中,如果该元组中的地址值存在于 rdd2 中。

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    加入并丢弃 rdd2 中的所有内容:

    rdd1 = sc.parallelize([(('address1', 'date1'),1), (('address5', 'date2'),1), (('address1', 'date2'),1), (('address2','date3'),1)])
    rdd2 = sc.parallelize([('address1',1), ('address1',1), ('address2', 1), ('address1', 1)])
    
    result_rdd = (rdd1.keyBy(lambda x: x[0][0])
                      .join(rdd2.map(lambda x: x[0])
                                .keyBy(lambda x: x)
                                .distinct())
                      .map(lambda x: x[1][0]))
    
    result_rdd.collect()
    [(('address2', 'date3'), 1), (('address1', 'date1'), 1), (('address1', 'date2'), 1)]
    

    【讨论】:

      猜你喜欢
      • 2021-04-10
      • 1970-01-01
      • 2015-01-17
      • 2017-06-17
      • 2018-09-11
      • 2017-06-10
      • 2016-12-16
      • 2014-11-20
      • 2015-06-15
      相关资源
      最近更新 更多