【发布时间】:2016-03-22 00:43:18
【问题描述】:
使用 Spark SQL,我有两个数据帧,它们是从一个创建的,例如:
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
我想过滤 df1,如果它的“路径”的一部分是 df2 中的任何路径。 因此,如果 df1 具有路径为“a/b/c/d/e”的行,我会发现 df2 中的行是否为路径为“a/b/c”的行。 在 SQL 中应该是这样的
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
其中 udf 是用户定义的函数,可缩短 df1 的原始路径。 幼稚的解决方案是使用 JOIN 然后过滤结果,但速度很慢,因为 df1 和 df2 各有超过 1000 万行。
我也尝试了以下代码,但首先我必须从 df2 创建广播变量
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
我遇到的问题是,在评估返回/执行 df2 过滤时,Spark 卡住了。
我想知道如何使用两个数据框来做到这一点。 我真的很想避免加入。有什么想法吗?
编辑>>
在我的原始代码中,df1 具有别名“first”和 df2“second”。此连接不是笛卡尔连接,也不使用广播。
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix 是 udf
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
shortenPath - 剪切路径中的最后两个字符
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
记录示例。路径是唯一的。
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
所以 df1 由以下组成
a/a/a/b/c abc
a/b/c/d/e abc
...
和 df2 组成
a/a/a qwe
a/b/c qwe
...
【问题讨论】:
-
问题已编辑。顺便说一句,UNION 对我来说也很有意义。但 Spark 不支持嵌套查询,例如“SELECT path FROM blabla WHERE value LIKE 'abc' AND parent(path) IN (SELECT path FROM blabla WHERE value LIKE 'qwe')”。使用 DataFrame api 也不支持。
-
你试过Filter pattern吗?当然,您需要调整给定的示例,但我认为这可能是答案
标签: java apache-spark dataframe apache-spark-sql spark-dataframe