【发布时间】:2021-09-23 22:36:13
【问题描述】:
我正在尝试将 UDF 与结构的输入类型数组一起使用。例如,假设我有以下数据结构。这将全部来自表格中的单个列,来自单个行。
[
{
"id": { "value": "23tsdag"},
"parser": { }
"probability: 1
},
{
"id": { "value": "ysadoghues"},
"parser": { }
"probability: .98
},
{
"id": { "value": "ds8galiusgh4"},
"parser": { }
"probability: .7
},
...
...
...
{
"id": { "value": "28sh32ds"},
"parser": { }
"probability: .3
}
]
对于我的 JAVA UDF,我想将其作为 Seq<Row> 读入(因为根据 Spark SQL UDF with complex input parameter 它说“... struct 类型被转换为 o.a.s.sql.Row ... 数据将被暴露如Seq[Row])"。)
因此,这是我的 JAVA 代码:
public class MyUdf implements UDF1<Seq<Row>, String> {
public String call(Seq<Row> sequence) throws Exception {
...
...
...
return "Some String";
}
}
如何测试这段代码?具体来说,我一直在尝试从文件中读取 json,将其转换为 Dataset<Row>,将其转换为 List<Row>,然后将其转换为 Seq<Row>,然后将其作为参数传递给我的 UDF,如下所示:
@Test
public void testMyUdf() throws Exception {
sqlCtx.udf().registerJava("my_udf", MyUdf.class, DataTypes.StringType);
String filePath = "sample_1.json";
Dataset<Row> ds = spark.read().option("multiline", "true").json(filePath);
List<Row> list = ds.collectAsList();
Seq<Row> sequence = JavaConverters.collectionAsScalaIterableConverter(list).asScala().toSeq();
sqlCtx.sql( "select my_udf(" + sequence + ")").show();
...
...
assertEquals(...)
}
但是,当我这样做时,我不断收到如下错误:
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '(' expecting {')', ','}(line 1, pos 52)
== SQL ==
select my_udf(Stream([[ABC/42gadsgy5wsdga==],.....
--------------------^^^
我做错了吗?我整天都被困在这上面,任何指示/提示/帮助将不胜感激。谢谢。
我这样做的全部目的是为了让我的 UDF 可以接收 Seq<Row>,如 Spark SQL UDF with complex input parameter 中所述。这甚至是正确的方法吗?
我希望通过使用 Rows 而不是使用特定类来尽可能通用(因为输入内容可能大不相同)
【问题讨论】:
标签: java sql scala apache-spark user-defined-functions