【问题标题】:Spark: Find pairs having at least n common attributes?Spark:找到至少有 n 个共同属性的对?
【发布时间】:2017-08-02 04:10:17
【问题描述】:

我有一个由(sensor_id, timestamp, data) 组成的数据集(sensor_id 是物联网设备的 id,时间戳是 UNIX 时间,数据是它们当时输出的 MD5 哈希)。表上没有主键,但每一行都是唯一的。

我需要找到所有 sensor_ids s1s2 对,这样这两个传感器至少有 n (n=50) 条目 (timestamp, data) 在它们之间是相同的,即在 n他们在不同的场合以相同的时间戳发出相同的数据。

为了了解数据的大小,我有 10B 行和约 50M 不同的sensor_ids,我相信大约有约 500 万对传感器 ID 在同一时间戳至少发出 50 次相同的数据。

在 Spark 中执行此操作的最佳方法是什么?我尝试了各种方法(group-by (timestamp, data) 和/或自加入),但它们的复杂性非常昂贵。

【问题讨论】:

  • 您能否添加一些示例数据、尝试的代码和预期的输出。

标签: algorithm apache-spark apache-spark-sql spark-streaming spark-dataframe


【解决方案1】:

这是一个从 Spark 中抽象出来的伪代码。您可以先对数据集进行排序:

select id, timestamp, data order by timestamp, data, id

示例 10 行:

s1,100,a  #1
s2,100,a  #2
s3,100,a  #3
s4,100,b  #4
s1,101,a  #5
s3,101,b  #6
s4,101,b  #7
s2,101,a  #8
s3,102,b  #9
s4,102,b  #10

现在从上到下迭代,只要时间戳和数据与上一个条目相同,就构建一个条目列表。

在我们的示例中,第 1-3 行形成了这样一个列表,因此我们已经看到了一些潜在的对:

s1, s2
s1, s3
s2, s3

第 4 行只是一个带有 (100,b) 的条目,我们可以跳过它。 第5行只有一个(101,a)的条目,我们可以跳过它。

第 6 行和第 7 行是新的一对:

s3, s4

#9 和 #10 也是一对

把它们放在一起可以很容易地数对:

s1, s2
s1, s3
s2, s3
s3, s4
s3, s4

这种方法的好处是,如果你可以对文件进行排序,你可以将排序后的数据集分割成多个更小的块(这些块应该在组边界上分割 - 即#1,2,3应该在一个块中) ,计算对,并作为最后一步加入最终结果。

我希望这会有所帮助。

【讨论】:

    【解决方案2】:

    如果我的理解是正确的,那么我可以通过使用下面的简单代码来实现这一点,

    test("Spark: Find pairs having atleast n common attributes"){
    /**
      * s1,1210283218710,34
        s1,1210283218730,24
        s1,1210283218750,84
        s1,1210283218780,54
        s2,1210283218710,34
        s2,1210283218730,24
        s2,1210283218750,84
        s2,1210283218780,54
        s3,1210283218730,24
        s3,1210283218750,84
        s3,1210283218780,54
      */
      val duplicateSensors = sc.textFile("sensor_data")
      .map(line => line.split(",")).map(ar=>((ar(1),ar(2)),ar(0) )) // (ts,val),sid
      .aggregateByKey(List.empty[String])(_ :+_,_:::_)// grouped(ts,val)(List(n sid))
      .flatMapValues(l => l.sorted.combinations(2))// (ts,val)(List(2 sid combination))
      .map(_._2).countByValue() // List(s1, s3) -> 3, List(s2, s3) -> 3, List(s1, s2) -> 4 (2sensors, no of common entries)
       // Now Do the filter .... grater than 50
      duplicateSensors.foreach(println)
    }
    

    你会得到具有共同属性的对。

    【讨论】:

    • 这不适用于我提到的数据集的大小。特别是这个 O(s^2),其中 s 是传感器的数量。
    • @m69,我很抱歉。我是新来的。删除了图像。添加代码sn-p。谢谢!
    • @Richard ,修复它。
    • 谢谢@RBanerjee!
    【解决方案3】:

    这就是我的做法。

    首先,生成一些假数据:

    #!/usr/bin/env python3
    import random
    
    fout = open('test_data.csv','w')
    
    i=0
    for x in range(100000):
      if i>=1000000:
        break
      for y in range(random.randint(0,100)):
        i         = i + 1
        timestamp = x
        sensor_id = random.randint(0,50)
        data      = random.randint(0,1000)
        fout.write("{} {} {}\n".format(timestamp,sensor_id,data))
    

    现在,您可以按如下方式处理数据。

    如果让行数为N,唯一时间戳的数量为T,每个时间戳的预期传感器数量为S em>,那么每个操作的复杂度和cmets中一样

    import itertools
    
    #Turn a set into a list of all unique unordered pairs in the set, without
    #including self-pairs
    def Pairs(x):
      temp = []
      x    = list(x)
      for i in range(len(x)):
        for j in range(i+1,len(x)):
          temp.append((x[i],x[j]))
      return temp
    
    #Load data
    #O(N) time to load data
    fin        = sc.textFile("file:///z/test_data.csv")
    #Split data at spaces, keep only the timestamp and sensorid portions
    #O(N) time to split each line of data
    lines      = fin.map(lambda line: line.split(" ")[0:2])
    #Convert each line into a timestamp-set pair, where the set contains the sensor
    #O(N) time to make each line into a timestamp-hashset pair
    monosets   = lines.map(lambda line: (line[0],set(line[1])))
    #Combine sets by timestamp to produce a list of timestamps and all sensors at
    #each timestamp
    #O(TS) time to place each line into a hash table of size O(T) where each 
    #entry in the hashtable is a hashset combining 
    timegroups = sets.reduceByKey(lambda a,b: a | b)
    #Convert sets at each timestamp into a list of all pairs of sensors that took
    #data at that timestamp
    #O(T S^2) time to do all pairs for each timestamp
    shared     = timegroups.flatMap(lambda tg: PairsWithoutSelf(tg[1]))
    #Associate each sensor pair with a value one
    #O(T S^2) time
    monoshared = shared.map(lambda x: (x,1))
    #Sum by sensor pair
    #O(T S^2) time
    paircounts = monoshared.reduceByKey(lambda a,b: a+b)
    #Filter by high hitters
    #O(<S^2) time
    good       = paircounts.filter(lambda x: x[1]>5)
    #Display results
    good.count()
    

    时间复杂度有点不稳定,因为我正在研究这个答案有点晚,但至少应该可以看到瓶颈。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-28
      • 2021-11-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-03
      相关资源
      最近更新 更多