【问题标题】:Join two dataframe based on a condition using Spark / Java使用 Spark / Java 根据条件加入两个数据框
【发布时间】:2020-10-28 11:47:56
【问题描述】:

我在 spark 上有 3 个数据帧:dataframe1、dataframe2 和 dataframe3。

我想根据条件将 dataframe1 与其他 dataframe 连接起来。

我使用以下代码:

Dataset <Row> df= dataframe1.filter(when(col("diffDate").lt(3888),dataframe1.join(dataframe2,
            dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
            and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
            and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe2.col("tracking_time").desc())).
                   otherwise(dataframe1.join(dataframe3,
                   dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
                           and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                           and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe3.col("tracking_time").desc())));

但我得到了这个例外

Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Dataset

编辑

输入数据帧:

数据框1

+-----------+-------------+-------------+-------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|
+-----------+-------------+-------------+-------------+
|222        |1            |5            |2020-05-30   |          
|4700       |8            |9            |2019-03-01   |
+-----------+-------------+-------------+-------------+

数据框2

+-----------+-------------+-------------+-------------+
|id_device  |id_vehicule  |tracking_time|longitude    |
+-----------+-------------+-------------+-------------+
|1          |5            |2020-05-12   | 33.21111    |       
|8          |9            |2019-03-01   |20.2222      |
+-----------+-------------+-------------+-------------+

数据框3

+-----------+-------------+-------------+-------------+
|id_device  |id_vehicule  |tracking_time|latitude     |
+-----------+-------------+-------------+-------------+
|1          |5            |2020-05-12   | 40.333      |       
|8          |9            |2019-02-28   |2.00000      |
+-----------+-------------+-------------+-------------+

当 diffDate

+-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|id_device  |id_vehicule  |tracking_time|longitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|222        |1            |5            |2020-05-30   | 1          |5            |2020-05-12   | 33.21111    |       
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+         

当 diffDate > 3888

 +-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|id_device  |id_vehicule  |tracking_time|latitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|4700        |9            |5            |2019-03-01   | 8          |9            |2019-02-28   | 2.00000    |       
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+         

我需要你的帮助

谢谢。

【问题讨论】:

  • 您可以发布示例输入和预期输出吗?

标签: java sql dataframe apache-spark


【解决方案1】:

我认为你需要重新审视你的代码。

您正在尝试为dataframe1 的每一行执行连接(当然基于条件),我认为这是不正确的要求或误解的要求。

when(condition, then).otherwise() 函数针对底层数据帧的每一行执行,通常用于根据条件处理列。函数中的thenelse/otherwise 子句仅支持literals,它们是数据框原始/复杂类型和文字中的现有列。 您不能将数据框或任何输出数据框的操作放在那里

可能您的要求是将datafrmae1datafrmae2 加入col("diffDate").lt(3888) 所在的行。为此,您可以执行以下操作 -

dataframe1.join(dataframe2,
                dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
                        and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                        and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                        and(dataframe1.col("diffDate").lt(3888))
                )
                        .orderBy(dataframe2.col("tracking_time").desc())

编辑-1


        dataframe1.as("a").join(dataframe2.as("b"),
                dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
                        and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                        and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                        and(dataframe1.col("diffDate").lt(3888))
        ).selectExpr("a.*", "b.longitude", "null as latitude")
                .unionByName(
                        dataframe1.as("a").join(dataframe3.as("c"),
                                dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
                                        and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                                        and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                                        and(dataframe1.col("diffDate").geq(3888))
                        ).selectExpr("a.*", "c.latitude", "null as longitude")
                               
                )

【讨论】:

  • 感谢您的回复。请注意,我想要一个数据框来确保过滤器在之后使用它。有没有可能这样做?
猜你喜欢
  • 2019-05-02
  • 1970-01-01
  • 2019-12-19
  • 2017-08-19
  • 2018-12-19
  • 2020-06-25
  • 1970-01-01
  • 1970-01-01
  • 2021-06-23
相关资源
最近更新 更多