【问题标题】:Fill missing timestamp with multiple categories using pyspark使用 pyspark 用多个类别填充缺失的时间戳
【发布时间】:2026-01-03 08:20:06
【问题描述】:

我正在尝试在 aws Glue 中使用 pyspark 填充缺失的时间戳。

我的原始数据的日期列格式类似于 20220202 我想将 20220202 转换为 2022-02-02。 所以,我使用了这样的代码。 (有 5 列。

(1)'date'为日期列(如20220202),

(2)'sku'是分类数据,如A,B,C..它有25个不同的值,每个sku都有自己的时间戳,

(3)'unitprice'是数字数据,每个sku有不同的unitprice。例如,如果 sku A 的单价为 30,而 sku A 在数据框中有 300 行,则 300 行具有相同的单价。但是 sku B 有不同的单价。

(4) 'trand_item' 是分类数据。这是一种 sku 的元数据,比如颜色。它只是分类数据和(3)的相同条件

(5) 'target' 为数值型数据,每一行都有不同的值。

当我们填写缺少的时间戳时,我想每天填写时间戳,并且我希望每个 SKU 的 'unitprice'、'trand_item' 值相同,但是当我们为新时间戳添加新行时,我希望在 target 中填写 0。

sparkDF = sparkDF.select('date', 'sku', 'unitprice', 'trand_item', 'target')

sparkDF = sparkDF.withColumn("date",sparkDF["date"].cast(StringType()))

sparkDF = sparkDF.withColumn("date", to_date(col("date"), "yyyymmdd"))

在数据中,有“sku”列。 此列是分类数据,它有 25 个不同的值,例如 A、B、C... 每个值都有自己的时间戳,每个值的开始日期不同。(结束日期相同。)

sparkDF = sparkDF.dropDuplicates(['date', 'sku'])

sparkDF = sparkDF.sort("sku", "date")

每个 sku(我们在数据中有 25 个 sku)都有自己的时间戳并且缺少时间戳,所以我想填充它。 我该如何处理?

<sample data>

   date      sku   unitprice    trand_item    target
2018-01-01    A      10            Black        3
2018-02-01    A      10            Black        7
2018-04-01    A      10            Black        13
2017-08-01    B      20            White        4
2017-10-01    B      20            White        17
2017-11-01    B      20            White        9



<output i want>
   date      sku   unitprice    trand_item    target
2018-01-01    A      10            Black        3
2018-02-01    A      10            Black        7
2018-03-01    A      10            Black        0
2018-04-01    A      10            Black        13
2017-08-01    B      20            White        4
2017-09-01    B      20            White        0
2017-10-01    B      20            White        17
2017-11-01    B      20            White        9

【问题讨论】:

  • 您能否提供一些输入数据和预期输出的简短示例,涵盖您所描述的场景?您谈论“填充缺少的时间戳”,大概是在没有时间戳数据的现有行中,但您还想“为新时间戳添加新行”,这有点令人困惑。
  • 我添加了@Barnesly
  • 谁能帮我解决这个问题??
  • 看起来@luiz-viola 提供的答案满足您的需求?

标签: apache-spark pyspark timestamp categories aws-glue


【解决方案1】:

您的意见:

data = [('2018-01-01','A',10,'Black',3),
('2018-02-01','A',10,'Black',7),
('2018-04-01','A',10,'Black',13),
('2017-08-01','B',20,'White',4),
('2017-10-01','B',20,'White',17),
('2017-11-01','B',20,'White',9)]

cols = ['date', 'sku', 'unitprice', 'trand_item', 'target']
  
df = sqlContext.createDataFrame(data, cols)

受到@blackbishop 在PySpark generate missing dates and fill data with previous value 上的惊人解决方案的启发

from pyspark.sql import functions as F
from pyspark.sql import Window

df = df.withColumn("date", F.to_date(F.col("date"), "yyyy-dd-MM"))

dates_range = df.groupBy("sku").agg(
    F.date_trunc("dd", F.max(F.col("date"))).alias("max_date"),
    F.date_trunc("dd", F.min(F.col("date"))).alias("min_date")
).select(
    "sku",
    F.expr("sequence(min_date, max_date, interval 1 day)").alias("date")
).withColumn(
    "date", F.explode("date")
).withColumn(
    "date",
    F.date_format("date", "yyyy-MM-dd")
)

w = Window.partitionBy("sku").orderBy("date")

result = dates_range\
          .join(df, ["sku", "date"], "left")\
          .select("sku","date",*[F.last(F.col(c), ignorenulls=True).over(w).alias(c)\
              for c in df.columns if c not in ("sku", "date", "target")],"target")\
          .fillna(0, subset=['target'])

result.show()

+---+----------+---------+----------+------+
|sku|      date|unitprice|trand_item|target|
+---+----------+---------+----------+------+
|  A|2018-01-01|       10|     Black|     3|
|  A|2018-01-02|       10|     Black|     7|
|  A|2018-01-03|       10|     Black|     0|
|  A|2018-01-04|       10|     Black|    13|
|  B|2017-01-08|       20|     White|     4|
|  B|2017-01-09|       20|     White|     0|
|  B|2017-01-10|       20|     White|    17|
|  B|2017-01-11|       20|     White|     9|
+---+----------+---------+----------+------+

【讨论】:

    最近更新 更多