【问题标题】:(JAVA) Spark SQL UDF with complex input parameter(JAVA) 具有复杂输入参数的 Spark SQL UDF
【发布时间】:2021-09-23 22:36:13
【问题描述】:

我正在尝试将 UDF 与结构的输入类型数组一起使用。例如,假设我有以下数据结构。这将全部来自表格中的单个列,来自单个行。

[
  {
    "id": { "value": "23tsdag"},
    "parser": { }
    "probability: 1
  },
  {
    "id": { "value": "ysadoghues"},
    "parser": { }
    "probability: .98
  },
  {
    "id": { "value": "ds8galiusgh4"},
    "parser": { }
    "probability: .7
  },
  ...
  ...
  ...
  {
    "id": { "value": "28sh32ds"},
    "parser": { }
    "probability: .3
  }
]

对于我的 JAVA UDF,我想将其作为 Seq<Row> 读入(因为根据 Spark SQL UDF with complex input parameter 它说“... struct 类型被转换为 o.a.s.sql.Row ... 数据将被暴露如Seq[Row])"。)

因此,这是我的 JAVA 代码:

public class MyUdf implements UDF1<Seq<Row>, String> {

    public String call(Seq<Row> sequence) throws Exception {
        ...
        ...
        ...
        return "Some String";
    }
}

如何测试这段代码?具体来说,我一直在尝试从文件中读取 json,将其转换为 Dataset&lt;Row&gt;,将其转换为 List&lt;Row&gt;,然后将其转换为 Seq&lt;Row&gt;,然后将其作为参数传递给我的 UDF,如下所示:

    @Test
    public void testMyUdf() throws Exception {
        sqlCtx.udf().registerJava("my_udf", MyUdf.class, DataTypes.StringType);
        String filePath = "sample_1.json";
        Dataset<Row> ds = spark.read().option("multiline", "true").json(filePath);
        List<Row> list = ds.collectAsList();
        Seq<Row> sequence = JavaConverters.collectionAsScalaIterableConverter(list).asScala().toSeq();
        sqlCtx.sql( "select my_udf(" + sequence + ")").show();

        ...
        ...
        assertEquals(...)
    }

但是,当我这样做时,我不断收到如下错误:

org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input '(' expecting {')', ','}(line 1, pos 52)

== SQL ==
select my_udf(Stream([[ABC/42gadsgy5wsdga==],.....
--------------------^^^

我做错了吗?我整天都被困在这上面,任何指示/提示/帮助将不胜感激。谢谢。

我这样做的全部目的是为了让我的 UDF 可以接收 Seq&lt;Row&gt;,如 Spark SQL UDF with complex input parameter 中所述。这甚至是正确的方法吗?

我希望通过使用 Rows 而不是使用特定类来尽可能通用(因为输入内容可能大不相同)

【问题讨论】:

    标签: java sql scala apache-spark user-defined-functions


    【解决方案1】:

    注册 udf 后,您可以在 spark sql 表达式中使用它。您的数据集应该是可查询的。为此,您可以创建数据集的临时视图。

    如果以下内容适合您,请告诉我:

        @Test
        public void testMyUdf() throws Exception {
            sqlCtx.udf().registerJava("my_udf", MyUdf.class, DataTypes.StringType);
            String filePath = "sample_1.json";
            Dataset<Row> ds = spark.read().option("multiline", "true").json(filePath);
           
            ds.createOrReplaceTempView("my_dataset")
    
            ds.printSchema(); //this line may be helpful to determine which columns are available
            //I am assuming investments is a column in your dataset/key in each json record of your original dataset
            sqlCtx.sql( "select my_udf(struct(*)) from my_dataset").show();
    
    
        }
    

    【讨论】:

    • 很抱歉,我刚刚用一个更好的例子更新了我的问题。我可以从上面的更新示例中获得一些帮助吗?
    • 我收到“函数 hydra_top_parse_complex_pretty_print 的参数数量无效。预期:1;找到:3;第 1 行 pos 7”
    • 我在这里发现了与您的更新类似的问题:stackoverflow.com/questions/31816975/…。您可能必须先将您的行转换为结构。我希望这会有所帮助
    • 谢谢,在 Java 中有没有办法做到这一点?
    • 我再次更新了答案。我将不得不在本地演示以提供更详细的答案
    猜你喜欢
    • 2016-11-19
    • 2019-05-21
    • 1970-01-01
    • 2017-07-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多