【问题标题】:Concatenate two dataframes in pyspark by substring search通过子字符串搜索连接 pyspark 中的两个数据帧
【发布时间】:2021-11-06 07:40:10
【问题描述】:
我有两个具有以下结构的数据框:
数据框 A:
| Current Accession |
String |
| A_1 |
AAAABBBC |
| A_2 |
AAACR |
此数据框包含 100 万个字符串。
数据框 B:
| Accession |
String |
| C_34 |
RRRQAAAABBBC
|
| C_35 |
RAAAABBBC
|
| C_36 |
WWWWAAACR
|
我想通过将数据框 A 中的子字符串查看到数据框 B 中来获得最终数据框,并使用找到的新加入创建一个新列,结果应如下所示:
| Current Accession |
String |
Mapped Accession |
| A_1 |
AAAABBBC |
[C_34,C_35] |
| A_2 |
AAACR |
[C_36] |
我已经探索过加入 pyspark,但它需要完全匹配。这不适用于子字符串匹配。
【问题讨论】:
标签:
python
dataframe
apache-spark
pyspark
【解决方案1】:
Column.contains可以用:
from pyspark.sql import functions as F
dfA = ...
dfB = ...
dfA.join(dfB, on=dfB["String"].contains(dfA["String"])) \
.groupBy("CurrentAccession").agg(
F.first(dfA["String"]),
F.collect_list("Accession")
).show()
输出:
+----------------+-------------+-----------------------+
|CurrentAccession|first(String)|collect_list(Accession)|
+----------------+-------------+-----------------------+
| A_1| AAAABBBC| [C_34, C_35]|
| A_2| AAACR| [C_36]|
+----------------+-------------+-----------------------+
但是,使用contains 作为连接条件有一个缺点:Spark 执行交叉连接:
dfA.join(dfB, on=dfB["String"].contains(dfA["String"])).explain()
表演
== Physical Plan ==
CartesianProduct Contains(String#71, String#67)
:- *(1) Filter isnotnull(String#67)
: +- *(1) Scan ExistingRDD[CurrentAccession#66,String#67]
+- *(2) Filter isnotnull(String#71)
+- *(2) Scan ExistingRDD[Accession#70,String#71]