【问题标题】:Hierarchical Agglomerative clustering for SparkSpark 的分层凝聚聚类
【发布时间】:2020-11-30 08:07:43
【问题描述】:

我正在做一个使用 Spark 和 Scala 的项目,我正在寻找一种类似于 scipy.cluster.hierarchy.fcluster 或 sklearn.cluster.AgglomerativeClustering 的层次聚类算法,可用于大量数据.

MLlib for Spark 实现了二等分 k-means,它需要输入集群的数量。不幸的是,在我的情况下,我不知道集群的数量,我更喜欢使用一些距离阈值作为输入参数,因为可以在上面的这两个 python 实现中使用。

如果有人知道答案,我将非常感激。

【问题讨论】:

    标签: scala apache-spark hierarchical-clustering


    【解决方案1】:

    所以我遇到了同样的问题,在寻找高低之后没有找到答案,所以我将在这里发布我所做的事情,希望它可以帮助其他人,也许有人会在此基础上继续发展。

    我所做的基本想法是递归地使用二等分 K-means 继续将集群分成两半,直到集群中的所有点都与质心相距指定距离。我使用的是 gps 数据,所以我有一些额外的机器来处理它。

    第一步是创建一个模型,将数据减半。我使用二等分 K 均值,但我认为这适用于任何 pyspark 聚类方法,只要您可以获得到质心的距离。

    import pyspark.sql.functions as f
    from pyspark import SparkContext, SQLContext
    from pyspark.ml.clustering import BisectingKMeans
    from pyspark.ml.linalg import Vectors
    from pyspark.ml.feature import VectorAssembler
    
    
    bkm = BisectingKMeans().setK(2).setSeed(1)
    assembler = VectorAssembler(inputCols=['lat','long'], outputCol="features")
    adf = assembler.transform(locAggDf)#locAggDf contains my location info
    model = bkm.fit(adf)
    # predictions will have the original data plus the "features" col which assigns a cluster number
    predictions = model.transform(adf)
    predictions.persist()
    

    下一步是我们的递归函数。这里的想法是我们指定与质心的距离,如果集群中的任何点比该距离更远,我们将集群切成两半。当一个集群足够紧密以至于它满足条件时,我将它添加到我用来构建最终集群的结果数组中

    def bisectToDist(model, predictions, bkm, precision, result = []):
        centers = model.clusterCenters()
        # row[0] is predictedClusterNum, row[1] is unit, row[2] point lat, row[3] point long
        # centers[row[0]] is the lat long of center, centers[row[0]][0] = lat, centers[row[0]][1] = long
        distUdf = f.udf(
            lambda row: getDistWrapper((centers[row[0]][0], centers[row[0]][1], row[1]), (row[2], row[3], row[1])),
            FloatType())##getDistWrapper(is how I calculate the distance of lat and long but you can define any distance metric)
        predictions = predictions.withColumn('dist', distUdf(
            f.struct(predictions.prediction, predictions.encodedPrecisionUnit, predictions.lat, predictions.long)))
        #create a df of all rows that were in clusters that had a point outside of the threshold
        toBig = predictions.join(
            predictions.groupby('prediction').agg({"dist": "max"}).filter(f.col('max(dist)') > self.precision).select(
                'prediction'), ['prediction'], 'leftsemi')
    
    
        #this could probably be improved
        #get all cluster numbers that were to big
        listids = toBig.select("prediction").distinct().rdd.flatMap(lambda x: x).collect()
    
        #if all data points are within the speficed distance of the centroid we can return the clustering
        if len(listids) == 0:
            return predictions
    
        # assuming binary class now k must be = 2
        # if one of the two clusters was small enough we will not have another recusion call for that cluster
        # we must save it and return it at this depth the clustiering that was 2 big will be cut in half in the loop below
        if len(listids) == 1:
            ok = predictions.join(
                predictions.groupby('prediction').agg({"dist": "max"}).filter(
                    f.col('max(dist)') <= precision).select(
                    'prediction'), ['prediction'], 'leftsemi')
    
    
        for clusterId in listids:
            # get all of the pieces that were to big
            part = toBig.filter(toBig.prediction == clusterId)
            
            # we now deed to refit the subset of the data
            assembler = VectorAssembler(inputCols=['lat', 'long'], outputCol="features")
            adf = assembler.transform(part.drop('prediction').drop('features').drop('dist'))
            model = bkm.fit(adf)
            #predictions now holds the new subclustering and we are ready for recursion
            predictions = model.transform(adf)
            result.append(bisectToDist(model, predictions, bkm, result=result))
    
        #return anything that was given and already good
    
        if len(listids) == 1:
            return ok
    

    最后我们可以调用函数并构建生成的数据帧

    result = []
    self.bisectToDist(model, predictions, bkm, result=result)
    #drop any nones can happen in recursive not top level call
    result =[r for r in result if r]
    
    
    r = result[0]
    r = r.withColumn('subIdx',f.lit(0))
    result = result[1:]
    idx = 1
    for r1 in result:
        r1 = r1.withColumn('subIdx',f.lit(idx))
        r = r.unionByName(r1)
        idx = idx + 1
    
    # each of the subclusters will have a 0 or 1 classification in order to make it 0 - n I added the following
    r = r.withColumn('delta', r.subIdx * 100 + r.prediction)
    r = r.withColumn('delta', r.delta - f.lag(r.delta, 1).over(Window.orderBy("delta"))).fillna(0)
    r = r.withColumn('ddelta', f.when(r.delta != 0,1).otherwise(0))
    r = r.withColumn('spacialLocNum',f.sum('ddelta').over(Window.orderBy(['subIdx','prediction'])))
    #spacialLocNum should be the final clustering 
    

    诚然,这是相当复杂和缓慢的,但它确实完成了工作,希望这会有所帮助!

    【讨论】:

    • 最后他写了一个错误:NameError: name 'self' is not defined (self.bisectToDist(model, predictions, bkm, result=result))
    猜你喜欢
    • 2017-10-24
    • 2014-06-28
    • 2022-01-01
    • 2016-09-06
    • 2018-04-10
    • 2021-10-04
    • 2018-04-29
    • 2021-07-25
    • 2020-01-23
    相关资源
    最近更新 更多