【问题标题】:Dynamic Set Algebra on SparkSpark 上的动态集合代数
【发布时间】:2016-11-11 11:36:50
【问题描述】:

考虑以下问题。给定:

  1. 集合集合
  2. 动态接收的布尔表达式

返回结果集。

Spark 是否有任何有效的算法或库来解决这个普遍问题?

这是一个玩具示例,从概念上说明问题:

val X  = Set("A1", "A2", "A3", "A4")
val Y  = Set("A2", "A4", "A5")

val collection = Set(X, Y)
val expression = "X and Y"

我正在寻找一种实现通用solve_expression 的方法,以便在上面的示例中:

output = solve_expression(expression, collection)

结果:

Set("A2", "A5")

我正在处理包含数百万个项目的集合,以及以字符串形式出现的布尔表达式。重要的是表达式中的每个原子(例如上面的“X”和“Y”)都是集合。表达式和集合是动态的(操作不能被硬编码,因为我们将它们作为输入接收并且我们事先不知道它们是什么)。

我对问题的表述很灵活。实际集合可以是 Set 类型,例如保存字符串(例如“A1”、“A2”),编码为二进制向量,或任何其他使其适合 Spark。

Spark 是否有任何库来解析求解集合上的一般布尔表达式?

【问题讨论】:

  • 使用X.union(Y)有什么问题?或者您想要堆外解决方案?
  • 为什么投反对票?您介意详细说明吗?
  • 感谢@ipoteka 表达式是动态的(不能提前硬编码)。

标签: scala apache-spark set pyspark boolean-expression


【解决方案1】:

好的。假设您想在 Spark 中执行此操作。此外,由于这些是巨大的集合,假设它们还没有在内存中,它们都在一个文件中 - 文件中的每一行表示集合中的一个条目。

我们将使用RDDs 表示集合 - Spark 存储数据的标准方式。

使用这个解析器(改编自here

import scala.util.parsing.combinator.JavaTokenParsers
import org.apache.spark.rdd.RDD

case class Query[T](setMap: Map[String, RDD[T]]) extends JavaTokenParsers {
  private lazy val expr: Parser[RDD[T]]
    = term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) }
  private lazy val term: Parser[RDD[T]]
    = fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) }
  private lazy val fact: Parser[RDD[T]]
    = vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp })
  private lazy val vari: Parser[RDD[T]]
    = setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap

  def apply(expression: String) = this.parseAll(expr, expression).get.distinct
}

在将上述内容粘贴到 shell 后,观察以下 spark-shell 交互(为简洁起见,我省略了一些回复):

> val x = sc.textFile("X.txt").cache \\ contains "1\n2\n3\n4\n5"
> val y = sc.textFile("Y.txt").cache \\ contains "3\n4\n5\n6\n7"
> val z = sc.textFile("Z.txt").cache \\ contains "3\n9\n\10"
> val sets = Map("x" -> x, "y" -> y, "z" -> z)
> val query = Query[Int](sets)

现在,我可以使用不同的表达式调用查询。请注意,这里我使用collect 来触发评估(因此我们可以看到集合内的内容),但如果集合真的很大,您通常只需保持RDD 原样(并将其保存到磁盘) .

> query("a union b").collect
res: Array[Int] = Array("1", "2", "3", "4", "5", "6", "7")
> query("a inter b").collect
res: Array[Int] = Array("3", "4", "5")
> query("a inter b union ((a inter b) union a)").collect
res: Array[Int] = Array("1", "2", "3", "4", "5")
> query("c union a inter b").collect
res: Array[Int] = Array("3", "4", "5", "9", "10")
> query("(c union a) inter b").collect
res: Array[Int] = Array("3", "4", "5")

虽然我没有费心去实现它,但是set差异应该是一行加法(非常类似于unioninter)。我认为集合补集是个坏主意......它们并不总是有意义(空集的补集是什么,你如何表示它?)。

【讨论】:

  • 虽然我同意应该使用 Parser Combinator,但这看起来很奇怪。它会产生数万亿的rdds,对吗?但在某种程度上,您应该切换到 scala 集合集。
  • @ipoteka 我不确定我理解你所说的“数万亿个 rdds”是什么意思——它会产生与表达式中的操作数一样多的 RDD,所以只有少数几个。是的,如果他的集合足够小,当然 OP 应该使用 scala 集合 - 但问题明确询问 Spark...
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-03-02
  • 1970-01-01
  • 2016-05-02
  • 1970-01-01
相关资源
最近更新 更多