【发布时间】:2021-10-30 02:43:37
【问题描述】:
我得到了一个rdd。例子: test = sc.parallelize([(1,0), (2,0), (3,0)])
我需要获取笛卡尔积并删除具有重复条目的结果元组对。 在这个玩具示例中,这些将是 ((1, 0), (1, 0)), ((2, 0), (2, 0)), ((3, 0), (3, 0))。
我可以得到笛卡尔积如下:注意收集和打印语句仅用于 疑难解答。
def compute_cartesian(rdd):
result1 = sc.parallelize(sorted(rdd.cartesian(rdd).collect()))
print(type(result1))
print(result1.collect())
我在这个阶段的类型和输出是正确的:
<class 'pyspark.rdd.RDD'>
[((1, 0), (1, 0)), ((1, 0), (2, 0)), ((1, 0), (3, 0)), ((2, 0), (1, 0)), ((2, 0), (2, 0)), ((2, 0), (3, 0)), ((3, 0), (1, 0)), ((3, 0), (2, 0)), ((3, 0), (3, 0))]
但现在我需要删除三对具有重复条目的元组。
到目前为止尝试过:
- .distinct() 这会运行,但不会产生正确的结果 rdd。
- .dropDuplicates() 不会运行。我认为这是 .dropDuplicates() 的错误用法。
- 手动功能:
没有 RDD,这项任务很容易。
# Remove duplicates
for elem in result:
if elem[0] == elem[1]:
result.remove(elem)
print(result)
print("After: ", len(result))
这是我编写的一个函数,它删除重复的元组对,然后吐出结果 len 以便我可以进行完整性检查。
我只是不确定如何直接在 RDD 上执行操作,在这种情况下,删除由笛卡尔积产生的任何重复元组对,并返回一个 RDD。
是的,我可以 .collect() 它,执行操作,然后将其重新键入为 RDD,但这违背了目的。假设这是数十亿对。我需要对 rdd 执行操作并返回一个 rdd。
【问题讨论】:
-
rdd.cartesian(rdd).filter(lambda x: x[0] != x[1])怎么样? -
注意我不会称那些“重复对”,而是“对角对”或“重复对”。这就是
distinct和dropDuplicates在这里不合适的原因:它们会删除重复的对,但这不是您想要的。 -
为什么你写的for循环由于rdd而不起作用?只是好奇会发生什么
-
@Stef,您的解决方案有效,感谢您解释 distinct 和 dropDuplicates 失败的原因。如果您重新发布作为答案,我会认为它是正确且有效的。
-
@dmscs rdd 可能与列表不同,但在迭代列表时删除列表元素效果不佳:How to remove list elements in a for loop in python?
标签: python-3.x apache-spark pyspark rdd