【问题标题】:How to merge 2 dataframe in Spark (Scala)?如何在 Spark(Scala)中合并 2 个数据框?
【发布时间】:2019-02-05 13:55:11
【问题描述】:

我是 Spark 框架的新手,需要一些帮助!

假设第一个DataFrame (df1) 存储了用户访问呼叫中心的时间。

+---------+-------------------+
|USER_NAME|       REQUEST_DATE|
+---------+-------------------+
|     Mark|2018-02-20 00:00:00|
|     Alex|2018-03-01 00:00:00|
|      Bob|2018-03-01 00:00:00|
|     Mark|2018-07-01 00:00:00|
|     Kate|2018-07-01 00:00:00|
+---------+-------------------+

第二个 DataFrame 存储有关一个人是否是组织成员的信息。 OUT 表示用户已离开组织。 IN 表示用户已来到组织。 START_DATEEND_DATE表示对应进程的开始和结束。

例如,您可以看到Alex2018-01-01 00:00:00 离开了组织,这个过程在2018-02-01 00:00:00 结束。您会注意到,一位用户可以在不同的时间以Mark 的身份进出组织。

+---------+---------------------+---------------------+--------+
|NAME     | START_DATE          | END_DATE            | STATUS |
+---------+---------------------+---------------------+--------+
|     Alex| 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
|      Bob| 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
|     Mark| 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
|     Mark| 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
|    Meggy| 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
+----------+--------------------+---------------------+--------+

我试图在决赛中获得这样的 DataFrame。它必须包含来自第一个 DataFrame 的所有记录以及一列,该列指示 Person 在请求时 (REQUEST_DATE) 是否是组织的成员。

+---------+-------------------+----------------+
|USER_NAME|       REQUEST_DATE| USER_STATUS    |
+---------+-------------------+----------------+
|     Mark|2018-02-20 00:00:00| Our user       |
|     Alex|2018-03-01 00:00:00| Not our user   |
|      Bob|2018-03-01 00:00:00| Our user       |
|     Mark|2018-07-01 00:00:00| Our user       |
|     Kate|2018-07-01 00:00:00| No Information |
+---------+-------------------+----------------+

我尝试了下一个代码,但在 finalDF 中出现错误:

org.apache.spark.SparkException: Task not serializable

在最终结果中我还需要日期时间。现在在lastRowByRequestId我只有没有时间的约会。

代码

val df1 = Seq(
    ("Mark", "2018-02-20 00:00:00"),
    ("Alex", "2018-03-01 00:00:00"),
    ("Bob", "2018-03-01 00:00:00"),
    ("Mark", "2018-07-01 00:00:00"),
    ("Kate", "2018-07-01 00:00:00")
).toDF("USER_NAME", "REQUEST_DATE")

df1.show()

val df2 = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df2.show()

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

case class UserAndRequest(
                           USER_NAME:String,
                           REQUEST_DATE:java.sql.Date,
                           START_DATE:java.sql.Date,
                           END_DATE:java.sql.Date,
                           STATUS:String,
                           REQUEST_ID:Long
                         )

val joined : Dataset[UserAndRequest] = df1.withColumn("REQUEST_ID", monotonically_increasing_id).
  join(df2,$"USER_NAME" === $"NAME", "left").
  as[UserAndRequest]

val lastRowByRequestId = joined.
  groupByKey(_.REQUEST_ID).
  reduceGroups( (x,y) =>
    if (x.REQUEST_DATE.getTime > x.END_DATE.getTime && x.END_DATE.getTime > y.END_DATE.getTime) x else y
  ).map(_._2)

def logic(status: String): String = {
  if (status == "IN") "Our user"
  else if (status == "OUT") "not our user"
  else "No Information"
}

val logicUDF = udf(logic _)

val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"REQUEST_DATE"))

【问题讨论】:

    标签: java scala apache-spark dataframe


    【解决方案1】:

    我检查了您的代码并运行它。它适用于次要更新。我用 STATUS 替换了 REQUEST_DATE。另外,请注意:Spark 未序列化任务大多数情况发生在您不使用案例类时,但从 Spark 2.x 开始,案例类会在 Spark 任务中自动编码。

    val finalDF = lastRowByRequestId.withColumn("USER_STATUS",logicUDF($"STATUS"))
    

    下面是输出

    +---------+------------+----------+----------+------+----------+--------------+
    |USER_NAME|REQUEST_DATE|START_DATE|  END_DATE|STATUS|REQUEST_ID|   USER_STATUS|
    +---------+------------+----------+----------+------+----------+--------------+
    |     Mark|  2018-02-20|2018-02-01|2018-03-01|    IN|         0|      Our user|
    |     Alex|  2018-03-01|2018-01-01|2018-02-01|   OUT|         1|  not our user|
    |     Mark|  2018-07-01|2018-02-01|2018-03-01|    IN|         3|      Our user|
    |      Bob|  2018-03-01|2018-02-01|2018-02-05|    IN|         2|      Our user|
    |     Kate|  2018-07-01|      null|      null|  null|         4|No Information|
    +---------+------------+----------+----------+------+----------+--------------+
    

    【讨论】:

    • 您好!事实上我使用case class。这就是为什么我有点困惑。此代码引发此类错误:org.apache.spark.SparkException: Task not serializable。也许我需要替换case class 并在其他地方初始化?还有最终结果如何有日期和时间。正如你现在所看到的,我只有没有时间的约会。你有什么想法吗?
    • 好吧,我将案例类中的代码从DATE_TIME: java.sql.Date 更改为DATE_TIME: java.sql.Timestamp。之后,我看到日期和时间。现在只有问题:Task not serializable。我该如何解决?
    • 在另一个类中删除并调用它
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-19
    • 2018-04-19
    • 1970-01-01
    • 2018-12-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多