【发布时间】:2019-02-01 12:59:10
【问题描述】:
我有一个包含三列的数据框:id、index 和 value。
+---+-----+-------------------+
| id|index| value|
+---+-----+-------------------+
| A| 1023|0.09938822262205915|
| A| 1046| 0.3110047630613805|
| A| 1069| 0.8486710971453512|
+---+-----+-------------------+
root
|-- id: string (nullable = true)
|-- index: integer (nullable = false)
|-- value: double (nullable = false)
然后,我有另一个数据框,显示每个 id 的理想周期:
+---+-----------+---------+
| id|start_index|end_index|
+---+-----------+---------+
| A| 1069| 1276|
| B| 2066| 2291|
| B| 1616| 1841|
| C| 3716| 3932|
+---+-----------+---------+
root
|-- id: string (nullable = true)
|-- start_index: integer (nullable = false)
|-- end_index: integer (nullable = false)
我有如下三个模板
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)
目标是,对于dfIntervals 中的每一行,应用一个函数(假设它是相关的),其中该函数接收来自dfRaw 的value 列和三个模板数组并将三列添加到dfIntervals,与每个模板相关的每一列。
假设: 1 - 模板数组的大小正好是 10。
2 - dfRaw 的 index 列中没有重复项
3 - dfIntervals 中的 start_index 和 end_index 列存在于 dfRaw 的 index 列中,并且它们之间正好有 10 行。例如,dfRaw.filter($"id" === "A").filter($"index" >= 1069 && $"index" <= 1276).count(dfIntervals 中的第一行)的结果正好是 10。
以下是生成这些数据帧的代码:
import org.apache.spark.sql.functions._
val mySeed = 1000
/* Defining templates for correlation analysis*/
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)
/* Defining raw data*/
var dfRaw = Seq(
("A", (1023 to 1603 by 23).toArray),
("B", (341 to 2300 by 25).toArray),
("C", (2756 to 3954 by 24).toArray)
).toDF("id", "index")
dfRaw = dfRaw.select($"id", explode($"index") as "index").withColumn("value", rand(seed=mySeed))
/* Defining intervals*/
var dfIntervals = Seq(
("A", 1069, 1276),
("B", 2066, 2291),
("B", 1616, 1841),
("C", 3716, 3932)
).toDF("id", "start_index", "end_index")
结果是dfIntervals 数据框添加了三列,名称为corr_w_template1、corr_w_template2 和corr_w_template3
PS:我在 Scala 中找不到相关函数。假设存在这样的函数(如下所示),并且我们即将使用它生成一个udf。
def correlation(arr1: Array[Double], arr2: Array[Double]): Double
【问题讨论】:
-
据我了解,您需要如下 udf:
def correlation(value: Double, template: Array[Double]): Double其中template可以是以下值之一:template1、template2、template3。value来自dfRaw。对吗? -
没错。我想这些函数需要应用三次才能获得原始信号和每个模板之间的相关性。
-
dfIntervals包含start_index&end_index所以应该是correlation(values: Array[Double], template: Array[Double]): Double对吧?其中values是从dfRaw 获得的,其中index在[start_index: end_index] 范围内 -
是的,没错。
标签: scala apache-spark dataframe user-defined-functions