【问题标题】:Check if a set of a field values is mapped against single value of another field in dataframe检查一组字段值是否映射到数据帧中另一个字段的单个值
【发布时间】:2019-12-26 15:13:17
【问题描述】:

考虑以下包含商店和书籍的数据框:

+-----------+------+-------+
| storename | book | price |
+-----------+------+-------+
| S1        | B11  | 10$   |  <<
| S2        | B11  | 11$   |
| S1        | B15  | 29$   |  <<
| S2        | B10  | 25$   |
| S2        | B16  | 30$   |
| S1        | B09  | 21$   |  <
| S3        | B15  | 22$   |
+-----------+------+-------+

假设我们需要找到有两本书的商店,即B11B15。在这里,答案是S1,因为它存储了两本书。

一种方法是使用以下命令找到拥有书籍B11 的商店与拥有书籍B15 的商店的交集:

val df_select = df.filter($"book" === "B11").select("storename")
.join(df.filter($"book" === "B15").select("storename"), Seq("storename"), "inner")

其中包含两者的商店名称。

但我想要一张桌子

+-----------+------+-------+
| storename | book | price |
+-----------+------+-------+
| S1        | B11  | 10$   |  <<
| S1        | B15  | 29$   |  <<
| S1        | B09  | 21$   |  <
+-----------+------+-------+

其中包含与该履行商店相关的所有记录。请注意,B09 不会被忽略。 (用例:用户可以在 same 商店中浏览一些 other 书籍)

我们可以通过将上述结果与原始数据框进行另一个交集来做到这一点:

df_select.join(df, Seq("storename"), "inner") 

但是,我看到step 1可扩展性和可读性问题,因为如果书籍数量超过 2,我必须继续将一个数据帧连接到另一个数据帧。要做的事情很多,那就是也容易出错。有没有更优雅的方式来做同样的事情?比如:

val storewise = Window.partitionBy("storename")
df.filter($"book".contains{"B11", "B15"}.over(storewise))

【问题讨论】:

  • S2 两本书都有,为什么会被过滤掉?如果您使用 spark 2.4+,则尝试执行 array_except 并按结果数组的大小进行过滤。
  • 啊……我打错了。现在更正..
  • spark 2.4+能用吗?
  • Spark 版本为 2.4.0
  • 感谢@vdep 和@jxc。添加两个概念,即你们建议的collect_setarray_except,我能够解决这个问题。 :)

标签: scala dataframe apache-spark filter


【解决方案1】:

使用array_except 函数找到了一个简单的解决方案。

  1. 将所需的字段值集作为数组添加到新列中,req_books
  2. 添加一列all_books,用于存储使用Window 存储在商店中的所有书籍。
  3. 使用以上两列查找商店是否缺少任何所需的书籍,如果缺少任何内容,则将其过滤掉。
  4. 删除创建的多余列。

代码

val df1 = df.withColumn("req_books", array(lit("B11"), lit("B15")))
            .withColumn("all_books", collect_set('book).over(Window.partitionBy('storename)))

df1.withColumn("missing_books", array_except('req_books, 'all_books))
   .filter(size('missing_books) === 0)
   .drop('missing_book).drop('all_books).drop('req_books).show

【讨论】:

  • 感谢@Saurav Sahu 分享解决方案!!
【解决方案2】:

使用Window函数创建所有值的数组并检查它是否包含所有必要的值。

val bookList = List("B11", "B15")  //list of books to search

def arrayContainsMultiple(bookList: Seq[String]) = udf((allBooks: WrappedArray[String]) => allBooks.intersect(bookList).sorted.equals(bookList.sorted))

val filteredDF = input
  .withColumn("allBooks", collect_set($"books").over(Window.partitionBy($"storename")))
  .filter(arrayContainsMultiple(bookList)($"allBooks"))
  .drop($"allBooks")

【讨论】:

  • 适用于列表中的 2 个元素,但 对于列表中的 2 个以上元素失败,错误为 org.apache.spark.sql.AnalysisException: cannot resolve 'array_contains (...)' 由于数据类型不匹配:函数 array_contains 的输入应该是数组,后跟具有相同元素类型的值,但它是 [array, boolean]。
  • 我在本地查过。不幸的是不起作用。命令和你的一样。它适用于 2 个元素列表,但是一旦我将另一个元素添加到同一个列表中,它就会失败。
  • 您可以在更改超过 2 个元素后粘贴代码吗?
  • 编辑了答案。我之前尝试避免使用UDFs,但我发现没有适合您需求的内置函数
猜你喜欢
  • 2015-07-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-06
  • 1970-01-01
  • 2021-08-01
相关资源
最近更新 更多