【问题标题】:Partial/Full-match value in one RDD to values in another RDD一个 RDD 中的部分/完全匹配值与另一个 RDD 中的值
【发布时间】:2018-02-05 07:26:12
【问题描述】:

我有两个 RDD,其中第一个 RDD 有以下形式的记录

RDD1 = (1, 2017-2-13,"ABX-3354 gsfette"
        2, 2017-3-18,"TYET-3423 asdsad"
        3, 2017-2-09,"TYET-3423 rewriu"
        4, 2017-2-13,"ABX-3354 42324"
        5, 2017-4-01,"TYET-3423 aerr")

第二个 RDD 的记录格式为

RDD2 = ('mfr1',"ABX-3354")
       ('mfr2',"TYET-3423")

我需要找到 RDD1 中的所有记录,这些记录对于 RDD2 中的每个值都具有完全匹配/部分匹配,匹配 RDD1 的第 3 列到 RDD2 的第 2 列并获取计数

对于这个例子,最终结果是:

ABX-3354  2
TYET-3423 3

最好的方法是什么?

【问题讨论】:

  • 从 RDD1 创建新列,提取第三列的第一部分。然后,按该列分组并仅在 RDD2 中过滤。
  • 但这将使得有必要在继续之前从 RDD1 收集所有元素。这是流环境中的一个难题,假设您的 RDD 可以包含大量数据。也许在这里使用稳定的布隆过滤器会有所帮助,如本文所述:webdocs.cs.ualberta.ca/~drafiei/papers/DupDet06Sigmod.pdf

标签: scala apache-spark apache-spark-sql pattern-matching


【解决方案1】:

这里是你如何得到结果

val RDD1 = spark.sparkContext.parallelize(Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-3354 42324"),
  (5, "2017-4-01", "TYET-3423 aerr")
))

val RDD2 = spark.sparkContext.parallelize(Seq(
  ("mfr1","ABX-3354"),
  ("mfr2","TYET-3423")
))

RDD1.map(r =>{
  (r._3.split(" ")(0), (r._1, r._2, r._3))
})
  .join(RDD2.map(r => (r._2, r._1)))
  .groupBy(_._1)
  .map(r => (r._1, r._2.toSeq.size))
  .foreach(println)

输出:

(TYET-3423,3)
(ABX-3354,2)

希望这会有所帮助!

【讨论】:

  • 我发现加入 2 个 RDD 非常非常慢。拥有一个 300mb 的 RDD 和另一个 1MB 大小的 RDD 需要几分钟才能加入、分组并最终打印回终端
【解决方案2】:

我正在发布几个使用 Spark SQL 的解决方案,并且更专注于对给定文本中的搜索字符串进行准确的模式匹配

1:使用 CrossJoin

import spark.implicits._

val df1 = Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-335442324"), //changed from "ABX-3354 42324"
  (5, "2017-4-01", "aerrTYET-3423") //changed from "TYET-3423 aerr"
).toDF("id", "dt", "txt")

val df2 = Seq(
  ("mfr1", "ABX-3354"),
  ("mfr2", "TYET-3423")
).toDF("col1", "key")

//match function for filter
def matcher(row: Row): Boolean = row.getAs[String]("txt")
  .contains(row.getAs[String]("key"))

val join = df1.crossJoin(df2)

import org.apache.spark.sql.functions.count

val result = join.filter(matcher _)
  .groupBy("key")
  .agg(count("txt").as("count"))

2:使用广播变量

import spark.implicits._

val df1 = Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-3354 42324"),
  (5, "2017-4-01", "aerrTYET-3423"),
  (6, "2017-4-01", "aerrYET-3423")
).toDF("id", "dt", "pattern")

//small dataset to broadcast
val df2 = Seq(
  ("mfr1", "ABX-3354"),
  ("mfr2", "TYET-3423")
).map(_._2) // considering only 2 values in pair

//Lookup to use in UDF
val lookup = spark.sparkContext.broadcast(df2)

//Udf
import org.apache.spark.sql.functions._
val matcher = udf((txt: String) => {
  val matches: Seq[String] = lookup.value.filter(txt.contains(_))
  if (matches.size > 0) matches.head else null
})

val result = df1.withColumn("match", matcher($"pattern"))
  .filter($"match".isNotNull) // not interested in non matching records
  .groupBy("match")
  .agg(count("pattern").as("count"))

两种解决方案的输出相同

result.show()

+---------+-----+
|      key|count|
+---------+-----+
|TYET-3423|    3|
| ABX-3354|    2|
+---------+-----+

【讨论】:

  • 嗯...看起来我无法使用 crossJoin,因为我正在使用 Spark 1.6
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-08-23
  • 2016-12-16
  • 1970-01-01
  • 1970-01-01
  • 2016-12-23
  • 2016-03-21
  • 1970-01-01
相关资源
最近更新 更多