【问题标题】:Spark Dataset API - joinSpark 数据集 API - 加入
【发布时间】:2016-07-27 12:54:05
【问题描述】:

我正在尝试使用 Spark Dataset API,但在执行简单连接时遇到了一些问题。

假设我有两个带有字段的数据集:date | value,那么对于DataFrame,我的联接将如下所示:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

但是对于Dataset.joinWith 方法,但同样的方法不起作用:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

.joinWith 需要的参数是什么?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql apache-spark-dataset


    【解决方案1】:

    要使用joinWith,您首先必须创建一个DataSet,很可能是其中两个。要创建DataSet,您需要创建一个与您的架构匹配的案例类并调用DataFrame.as[T],其中T 是您的案例类。所以:

    case class KeyValue(key: Int, value: String)
    val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
    val ds = df.as[KeyValue]
    // org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
    

    您也可以跳过案例类并使用元组:

    val tupDs = df.as[(Int,String)]
    // org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
    

    如果你有另一个案例类/DF,像这样说:

    case class Nums(key: Int, num1: Double, num2: Long)
    val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
    val ds2 = df2.as[Nums]
    // org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
    

    那么,joinjoinWith 的语法虽然相似,但结果却不同:

    df.join(df2, df.col("key") === df2.col("key")).show
    // +---+-----+---+----+----+
    // |key|value|key|num1|num2|
    // +---+-----+---+----+----+
    // |  1| asdf|  1| 7.7| 101|
    // |  2|34234|  2| 1.2|  10|
    // +---+-----+---+----+----+
    
    ds.joinWith(ds2, df.col("key") === df2.col("key")).show
    // +---------+-----------+
    // |       _1|         _2|
    // +---------+-----------+
    // | [1,asdf]|[1,7.7,101]|
    // |[2,34234]| [2,1.2,10]|
    // +---------+-----------+
    

    如您所见,joinWith 将对象原封不动地保留为元组的一部分,而join 将列扁平化为单个命名空间。 (在上述情况下会出现问题,因为列名“key”是重复的。)

    奇怪的是,我必须使用df.col("key")df2.col("key") 来创建加入dsds2 的条件——如果你只在任一侧使用col("key") 它就不起作用,而@987654340 @ 不存在。然而,使用原始的 df.col("key") 就可以了。

    【讨论】:

    • 详细解释。只是一种困惑。有没有更好的方法来编写类型化的连接条件。例如df.col("key") 我们可以有一些更安全的类型,可以在编译时解决“key”的正确性。
    • 我完全同意,基于这种语法创建数据集没有用,那么好处在哪里?我无法克服没有打字替代品的事实......真可惜!
    【解决方案2】:

    来自https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html

    看起来你可以这样做

    dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
    

    【讨论】:

      【解决方案3】:

      对于上面的例子,你可以试试下面的:

      为您的输出定义一个案例类

      case class JoinOutput(key:Int, value:String, num1:Double, num2:Long) 
      

      Seq("key")连接两个数据集,这将帮助您避免输出中出现两个重复的键列,这也有助于在下一步中应用案例类或获取数据

      val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
      // res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]
      

      结果将是平坦的:

      joined.show
      
      +---+-----+----+----+
      |key|value|num1|num2|
      +---+-----+----+----+
      |  1| asdf| 7.7| 101|
      |  2|34234| 1.2|  10|
      +---+-----+----+----+
      

      【讨论】:

      • 你没有具体回答这个问题,但是 Seq("key") 提示帮助了我
      • 你没有回答如何使用.joinWith,而且.join实际上是一个无类型转换,在这种情况下你没有从Dataset的类型安全中受益
      猜你喜欢
      • 2018-12-19
      • 1970-01-01
      • 1970-01-01
      • 2017-07-16
      • 2020-08-21
      • 1970-01-01
      • 2018-12-19
      • 2021-12-30
      • 1970-01-01
      相关资源
      最近更新 更多