【问题标题】:Spark Scala : Join two Dataframes by near position and time rangeSpark Scala:按近位置和时间范围连接两个数据帧
【发布时间】:2018-09-13 19:44:27
【问题描述】:

我有两个数据框:

  1. 一个数据框DF1,其结构如下:(ID, StartDate, EndDate, Position)

  2. 一个 Dataframe DF2 看起来像:(DateTime, Position)

我想使用这些 Dataframes 创建一个新的,其中包含每个 DF1(ID)、DF2 中 DF2(DateTime) 位于 DF1(StartDate) 和 DF1(EndDate) 和 DF2(Position) 之间的行数) 靠近 DF1(位置)

我们可以假设我有一个 udf 函数 isNearUDF(pos1,pos2) 来比较位置。

我目前正在尝试通过我的数据框之间的连接来执行此操作,但这似乎不是正确的解决方案

编辑2:

这是一个 MVCE:

def isInRadius(lat1:Double,lon1:Double,lat2:Double,lon2:Double,dist:Double):Boolean={
  val distance = 0// calculate distance between lon/lat positions

  return distance<=dist
}

val DF1 = sc.parallelize(Array(
  ("ID1", "2018-02-27T13:47:59.416+01:00", "2018-03-01T16:02:00.632+01:00", "25.13297154663", "55.13297154663"),
  ("ID2", "2018-02-25T13:47:59.416+01:00", "2018-02-07T16:02:00.632+01:00", "26.13297154663", "55.13297154663"),
  ("ID3", "2018-02-24T13:47:59.416+01:00", "2018-02-02T16:02:00.632+01:00", "25.13297154663", "55.13297154663")
// ...
)).toDF("ID", "CreationDate","EndDate","Lat1","Lon1")

val DF2 = sc.parallelize(Array(
  ("2018-02-27T13:47:59.416+01:00","25.13297154663", "55.13297154663"),
  ("2018-02-27T13:47:59.416+01:00","25.1304663", "54.10663"),
  ("2018-02-27T13:47:59.416+01:00","25.1354663", "55.132904663")
  // ...
)).toDF("DateTime","Lat2","Lon2")

val isInRadiusUdf = udf(isInRadius _)

val DF3 = DF1.join(DF2,$"DateTime">=$"CreationDate" && $"DateTime"<=$"EndDate" /*&& isInRadiusUdf($"Lat1",$"Lon1",$"Lat2",$"Lon2",lit(10))*/)

display(DF3)

这适用于约会,但需要很长时间。 当我添加 isInRadius 条件时,出现错误:

SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.DataFrameReader

【问题讨论】:

  • 为什么加入不是一个好的解决方案?
  • 在进行连接时无法使用我的 UDF 函数,我将提供我的代码示例。此外,在检查 DateTime 是否在时间范围内的同时加入需要很多时间
  • 如果不加入,您将无法同时在两个数据帧上执行 udf 功能。如果在加入时应用逻辑,则不必执行 udf 功能
  • 为我们提供两个数据帧的样本以及架构和预期输出
  • 我们需要MVCE

标签: scala apache-spark spark-dataframe


【解决方案1】:

尝试将您的函数定义更改为:

def isInRadius : Double => Double => Double => Double => Double = lat1 => long1 => lat2 => long2 => dist {
  val distance = // calculate distance between lon/lat positions

  return distance<=dist
}

【讨论】:

  • 这种语法有什么不同?当我尝试这个时,我得到 2 个错误: 错误:未找到:值 dist def isInRadius : Double => Double => Double => Double => Double = lat1 => long1 => lat2 => long2 => dist { 错误:不是找到:值 dist 返回距离
【解决方案2】:

在尝试了各种可能的解决方案并得到了奇怪的结果之后,我终于设法通过简单地重新启动我的 Spark 集群(Databricks Notebook)来解决我的问题 我完全不知道问题出在哪里,但 MVCE 的代码现在可以工作了。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-25
    • 2017-08-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-01
    • 1970-01-01
    相关资源
    最近更新 更多