【问题标题】:Spark - convert string IDs to unique integer IDsSpark - 将字符串 ID 转换为唯一的整数 ID
【发布时间】:2015-03-21 18:00:06
【问题描述】:

我有一个如下所示的数据集,其中每个用户和产品 ID 都是一个字符串:

userA, productX
userA, productX
userB, productY

拥有约 280 万种产品和 3 亿用户;大约 21 亿个用户-产品关联。

我的最终目标是在此数据集上运行 Spark 协同过滤 (ALS)。由于它需要用户和产品的 int 键,所以我的第一步是为每个用户和产品分配一个唯一的 int,并转换上面的数据集,以便用户和产品由 int 表示。

这是我迄今为止尝试过的:

val rawInputData = sc.textFile(params.inputPath)
  .filter { line => !(line contains "\\N") }
  .map { line =>
      val parts = line.split("\t")
      (parts(0), parts(1))  // user, product
    }

// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()

// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()

idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)

// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
  case (id1, (id2s, idx1s)) =>
    val idx1 = idx1s.head
    id2s.map { (_, idx1)
    }
}

// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
  case (id2, (idx1s, idx2s)) =>
    val idx2 = idx2s.head
    idx1s.map{ (_, idx2)
    }
}

// save output
val convertedInts = converted.map{
  case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)

当我尝试在我的集群(40 个执行程序,每个执行程序,每个 5 GB RAM)上运行它时,它能够很好地生成 idx1map 和 idx2map 文件,但它会因内存不足错误而失败,并且在 cogroup 之后的第一个 flatMap 处获取失败.我以前对 Spark 做的不多,所以我想知道是否有更好的方法来实现这一点;我不知道这项工作的哪些步骤会很昂贵。当然,cogroup 需要在整个网络中对整个数据集进行洗牌;但是这样的事情是什么意思?

FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)

我不只是使用散列函数的原因是我最终希望在更大的数据集上运行它(大约 10 亿个产品、10 亿个用户、350 亿个关联),以及Int 键冲突会变得非常大。在这种规模的数据集上运行 ALS 是否接近可行?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    我看起来您实际上是在收集所有用户列表,只是为了再次拆分它们。尝试只使用 join 而不是 cogroup,这在我看来更像你想要的。例如:

    import org.apache.spark.SparkContext._
    // Create some fake data
    val data = sc.parallelize(Seq(("userA", "productA"),("userA", "productB"),("userB", "productB")))
    val userId = sc.parallelize(Seq(("userA",1),("userB",2)))
    val productId = sc.parallelize(Seq(("productA",1),("productB",2)))
    
    // Replace userName with ID's
    val userReplaced = data.join(userId).map{case (_,(prod,user)) => (prod,user)}
    // Replace product names with ID's
    val bothReplaced = userReplaced.join(productId).map{case (_,(user,prod)) => (user,prod)}
    
    // Check results:
    bothReplaced.collect()) // Array((1,1), (1,2), (2,2))
    

    请对它的表现发表评论。

    (我不知道FetchFailed(...) 是什么意思)

    【讨论】:

      【解决方案2】:

      我的平台版本:CDH:5.7,Spark:1.6.0/StandAlone;

      我的测试数据大小:31815167 所有数据; 31562704 个不同的用户字符串,4140276 个不同的产品字符串。

      1. 第一个想法:

      我的第一个想法是使用 collectAsMap 操作,然后使用地图想法将用户/产品字符串更改为 int 。驱动内存高达 12G 时,我得到 OOM 或 GC 开销异常(该异常受驱动内存限制)。

      但是这个想法只能用在小数据量上,数据量越大,就需要更大的驱动内存。

      1. 第二个想法: 第二个想法是使用连接方法,正如 Tobber 所提议的那样。这是一些测试结果: 作业设置:

        • 驱动:2G,2cpu;
        • 执行器:(8G , 4 cpu) * 7;

      我按照以下步骤操作:

      • 1) 找到唯一的用户字符串和 zipWithIndexes;
      • 2) 加入原始数据;
      • 3) 保存编码数据;

      这项工作大约需要 10 分钟才能完成。

      【讨论】:

        猜你喜欢
        • 2017-12-01
        • 1970-01-01
        • 2010-12-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-28
        • 1970-01-01
        相关资源
        最近更新 更多