【问题标题】:How to find common elements among two array columns?如何在两个数组列之间找到共同元素?
【发布时间】:2026-01-28 00:55:01
【问题描述】:

我有两个逗号分隔的字符串列(sourceAuthorstargetAuthors)。

val df = Seq(
  ("Author1,Author2,Author3","Author2,Author3,Author1")
).toDF("source","target")

我想添加另一列nCommonAuthors,其中包含共同作者的数量。

我尝试过这样做:

def myUDF = udf { (s1: String, s2: String) =>
  s1.split(",")
  s2.split(",")
  s1.intersect(s2).length
}
val newDF = myDF.withColumn("nCommonAuthors", myUDF($"source", $"target"))

我收到以下错误:

线程“main”java.lang.UnsupportedOperationException 中的异常:不支持 Unit 类型的架构

知道为什么会出现此错误吗?如何找到两列的共同元素?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    除非我误解了您的问题,否则有一些标准函数可以帮助您(因此您不必编写 UDF),即 splitarray_intersect

    给定以下数据集:

    val df = Seq(("Author1,Author2,Author3","Author2,Author3"))
      .toDF("source","target")
    scala> df.show(false)
    +-----------------------+---------------+
    |source                 |target         |
    +-----------------------+---------------+
    |Author1,Author2,Author3|Author2,Author3|
    +-----------------------+---------------+
    

    您可以编写以下结构化查询:

    val intersect = array_intersect(split('source, ","), split('target, ","))
    val solution = df.select(intersect as "common_elements")
    scala> solution.show(false)
    +------------------+
    |common_elements   |
    +------------------+
    |[Author2, Author3]|
    +------------------+
    

    【讨论】:

      【解决方案2】:

      根据 SCouto 的回答,我为您提供适合我的完整解决方案:

        def myUDF: UserDefinedFunction = udf(
      (s1: String, s2: String) => {
        val splitted1 = s1.split(",")
        val splitted2 = s2.split(",")
        splitted1.intersect(splitted2).length
      })
      
        val spark = SparkSession.builder().master("local").getOrCreate()
      
        import spark.implicits._
      
        val df = Seq(("Author1,Author2,Author3","Author2,Author3,Author1")).toDF("source","target")
      
        df.show(false)
      
      +-----------------------+-----------------------+
      |source                 |target                 |
      +-----------------------+-----------------------+
      |Author1,Author2,Author3|Author2,Author3,Author1|
      +-----------------------+-----------------------+
      
        val newDF: DataFrame = df.withColumn("nCommonAuthors", myUDF('source,'target))
      
        newDF.show(false)
      
      +-----------------------+-----------------------+--------------+
      |source                 |target                 |nCommonAuthors|
      +-----------------------+-----------------------+--------------+
      |Author1,Author2,Author3|Author2,Author3,Author1|3             |
      +-----------------------+-----------------------+--------------+
      

      【讨论】:

        【解决方案3】:

        那个错误意味着你的 udf 正在返回单元(根本没有返回,因为 void un Java)

        试试这个。您在原始 s1 和 S2 上应用相交,而不是在拆分后的上应用。

        def myUDF = udf((s1: String, s2: String) =>{

          val splitted1 = s1.split(",")
        
        
          val splitted2= s2.split(",")
        
        
        splitted1.intersect(splitted2).length
        

        } )

        【讨论】:

          最近更新 更多