【问题标题】:Cannot merge two DataFrames in Scala Spark无法在 Scala Spark 中合并两个 DataFrame
【发布时间】:2017-08-05 00:11:28
【问题描述】:

我一直在尝试将 1 个 DataFrame 附加到 Scala 中的另一个 DF。在这种情况下,追加操作只是将一个相同大小的新列添加到现有列 - 不涉及键匹配。两个 DataFrame 的形状相同(仅 5 行和 1 列)。

scala> val coefficients = lrModel.coefficients.toArray.toSeq.toDF("coefficients")
coefficients: org.apache.spark.sql.DataFrame = [coefficients: double]

scala> coefficients.show()
+--------------------+
|        coefficients|
+--------------------+
|   -59525.0697785032|
|   6957.836000531959|
|   314.2998010755629|
|-0.37884289844065666|
|  -1758.154438149325|
+--------------------+
scala> val tvalues = trainingSummary.tValues.toArray.drop(1).toSeq.toDF("t-values")
tvalues: org.apache.spark.sql.DataFrame = [t-values: double]

scala> tvalues.show()
+-------------------+
|           t-values|
+-------------------+
| 1.8267249911295418|
| 100.35507390273406|
| -8.768588605222108|
|-0.4656738230173362|
|  10.48091833711012|
+-------------------+

join() 函数运行,我什至可以获取架构,但是当我想显示新 DF 的所有值时,我得到了错误:

scala> val outputModelDF1 = coefficients.join(tvalues)
outputModelDF1: org.apache.spark.sql.DataFrame = [coefficients: double, t-values: double]
scala> outputModelDF1.printSchema()
root
 |-- coefficients: double (nullable = false)
 |-- t-values: double (nullable = false)

scala> outputModelDF1.show()
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [value#359 AS coefficients#361]
+- LocalRelation [value#359]
and
Project [value#368 AS t-values#370]
+- LocalRelation [value#368]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1077)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1062)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
  ... 52 elided

知道如何处理它以及如何简单地将这两个 DF 合并在一起吗?

更新 1

我应该说明我想要实现的输出格式。请看下面:

    +--------------------+--------------------+
    |        coefficients|            t-values|
    +--------------------+--------------------+
    |   -59525.0697785032|  1.8267249911295418|
    |   6957.836000531959|  100.35507390273406|
    |   314.2998010755629|  -8.768588605222108|
    |-0.37884289844065666| -0.4656738230173362|
    |  -1758.154438149325|  -1758.154438149325|
    +--------------------+--------------------+

更新 2

很遗憾,以下使用withColumn() 的方法不起作用。

scala> val outputModelDF1 = coefficients.withColumn("t-values", tvalues("t-values"))
org.apache.spark.sql.AnalysisException: resolved attribute(s) t-values#119 missing from coefficients#113 in operator !Project [coefficients#113, t-values#119 AS t-values#130];;
!Project [coefficients#113, t-values#119 AS t-values#130]
+- Project [value#111 AS coefficients#113]
   +- LocalRelation [value#111]

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:347)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
  at org.apache.spark.sql.Dataset.withColumn(Dataset.scala:1908)
  ... 52 elided

【问题讨论】:

  • 您正在执行 SQL 交叉连接,而不是将两列附加在一起
  • @cricket_007 是的,我知道,从错误消息中可以清楚地看出,但我不想要 crossJoin。请参阅上面所需输出的更新。
  • withColumn函数
  • @cricket_007 谢谢,你有个好主意。下面的 Leo C 展示了工作示例。

标签: scala apache-spark merge


【解决方案1】:

一种方法是使用monotonicallyIncreasingId 在数据框中为join 创建关键列:

val df1 = Seq(
  (-59525.0697785032), (6957.836000531959), (314.2998010755629), (-0.37884289844065666), (-1758.154438149325)
).toDF("coefficients")

val df2 = Seq(
  (1.8267249911295418), (100.35507390273406), (-8.768588605222108), (-0.4656738230173362), (10.48091833711012)
).toDF("t-values")

val df1R = df1.withColumn("rowid", monotonicallyIncreasingId)
val df2R = df2.withColumn("rowid", monotonicallyIncreasingId)

val dfJoined = df1R.join(df2R, Seq("rowid"))

dfJoined.show
+-----+--------------------+-------------------+
|rowid|        coefficients|           t-values|
+-----+--------------------+-------------------+
|    0|   -59525.0697785032| 1.8267249911295418|
|    1|   6957.836000531959| 100.35507390273406|
|    2|   314.2998010755629| -8.768588605222108|
|    3|-0.37884289844065666|-0.4656738230173362|
|    4|  -1758.154438149325|  10.48091833711012|
+-----+--------------------+-------------------+

【讨论】:

  • 谢谢,这行得通。我已经接受了你的解决方案。但是,我仍然希望有更好的方法来做到这一点。我发现创建 2 个额外的 DF 效率非常低。知道 Scala 是否有类似于 R 的 cbind() 函数的东西吗?
  • @simtim,不幸的是,我不知道使用 Scala 在 Spark 中 R's cbind 的任何等效性。您可以考虑将数据帧转换为 RDD,像 df1.rdd zip df2.rdd 一样执行 zip,然后将结果转换回数据帧。
  • 我更喜欢在这种情况下使用 withColumn() 而不是将数据帧转换为 RDD 并返回。当然,我的示例中的这些 DF 非常小——我基本上是在清理一些 ml 算法的输出,但我想知道在大型 DF 上使用 withColumn() 或 zip 方法的效率如何。当然,这一定很慢而且很耗内存。
  • 你说得对,withColumn 并不是特别便宜。在您的情况下,ML 计算(如果适用)最好将行标识列保留在结果数据框中。
猜你喜欢
  • 2015-10-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-16
  • 1970-01-01
  • 1970-01-01
  • 2021-02-21
相关资源
最近更新 更多