【问题标题】:Join two RDDs on custom function - SPARK在自定义函数上加入两个 RDD - SPARK
【发布时间】:2017-04-07 09:11:58
【问题描述】:

是否可以在自定义函数上加入 Spark 中的两个 RDD? 我有两个以字符串为键的大 RDD。我想加入他们,而不是使用经典的 Join 而是一个自定义函数,例如:

def my_func(a,b):
    return Lev.distance(a,b) < 2

result_rdd = rdd1.join(rdd2, my_func)

如果不可能,是否有任何替代方案可以继续利用 Spark 集群的优势? 我写了这样的东西,但 pyspark 将无法在我的小集群上分发工作。

def custom_join(rdd1, rdd2, my_func):
    a = rdd1.sortByKey().collect()
    b = rdd2.sortByKey().collect()
    i = 0
    j = 0
    res = []
    while i < len(a) and j < len(b):
        if my_func(a[i][0],b[j][0]):
            res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))]
            i+=1
            j+=1
        elif a[i][0] < b[j][0]:
            i+=1
        else:
            j+=1

    return sc.parallelize(res)

提前致谢(抱歉我的英语是意大利人)

【问题讨论】:

    标签: python join apache-spark pyspark cluster-computing


    【解决方案1】:

    您可以使用笛卡尔,然后根据条件进行过滤。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext
    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("b", 3)])
    
    def customFunc(x):
        # You may use any condition here
        return x[0][0] ==x[1][0]
    
    print(x.join(y).collect()) # normal join
    # replicating join with cartesian
    print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect())
    

    输出:

    [('b', (4, 3)), ('a', (1, 2))]
    [('a', (1, 2)), ('b', (4, 3))]
    

    【讨论】:

    • 谢谢,但我认为笛卡尔积与联接相比效率很低。我正在处理一个包含大约 2M 条目的数据库。
    • 可以使用dataframe api吗?
    • 数据帧与集群计算兼容吗?
    • 是的。但即使使用 dataframe api,它也会导致笛卡尔积。对不起stackoverflow.com/questions/32952080/…
    猜你喜欢
    • 2016-01-24
    • 2016-09-07
    • 2017-06-27
    • 2019-01-01
    • 2018-11-24
    • 2015-08-07
    • 1970-01-01
    • 2017-11-15
    • 1970-01-01
    相关资源
    最近更新 更多