【问题标题】:replace null value in the column of a dataframe with the value in other dataframe wrt to id将数据框列中的空值替换为其他数据框中的值 wrt 到 id
【发布时间】:2019-02-19 07:45:05
【问题描述】:

我有两个数据框

df1:

    +---------------+-------------------+-----+------------------------+------------------------+---------+
|id             |dt                 |speed|stats                   |lag_stat                |lag_speed|
+---------------+-------------------+-----+------------------------+------------------------+---------+
|358899055773504|2018-07-31 18:38:36|0    |[9, -1, -1, 13, 0, 1, 0]|null                    |null     |
|358899055773504|2018-07-31 18:58:34|0    |[9, 0, -1, 22, 0, 1, 0] |[9, -1, -1, 13, 0, 1, 0]|0        |
|358899055773505|2018-07-31 18:54:23|4    |[9, 0, 0, 22, 1, 1, 1]  |null                    |null     |
+---------------+-------------------+-----+------------------------+------------------------+---------+

df2:

+---------------+-------------------+-----+------------------------+
|id             |dt                 |speed|stats                   |
+---------------+-------------------+-----+------------------------+
|358899055773504|2018-07-31 18:38:34|0    |[9, -1, -1, 13, 0, 1, 0]|
|358899055773505|2018-07-31 18:48:23|4    |[8, -1, 0, 22, 1, 1, 1] |
+---------------+-------------------+-----+------------------------+

我想将 df1 中的 lag_stat,speed 列中的 null 值替换为 stat 的值和从数据帧 df2 wrt 到相同 id 的速度。

所需的输出如下所示:

  +---------------+-------------------+-----+--------------------+--------------------+---------+
    |             id|                 dt|speed|               stats|            lag_stat|lag_speed|
    +---------------+-------------------+-----+--------------------+--------------------+---------+
    |358899055773504|2018-07-31 18:38:36|   0|[9, -1, -1, 13, 0, 1,0]|[9, -1, -1, 13, 0, 1, 0]|  0|
    |358899055773504|2018-07-31 18:58:34|   0|[9, 0, -1, 22, 0, 1, 0]|[9, -1, -1, 13, 0, 1, 0]|  0|
    |358899055773505|2018-07-31 18:54:23|   4|[9, 0, 0, 22, 1, 1, 1]|[8, -1, 0, 22, 1, 1, 1] | 4 |
    +---------------+-------------------+-----+--------------------+--------------------+---------+

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    一种可能的方法是 join DF,然后在该列上应用一些 when 函数。

    例如,这个:

    val output = df1.join(df2, df1.col("id")===df2.col("id"))
          .select(df1.col("id"),
                  df1.col("dt"),
                  df1.col("speed"),
                  df1.col("stats"),
                  when(df1.col("lag_stat").isNull,df2.col("stats")).otherwise(df1.col("lag_stat")).alias("lag_stats"),
                  when(df1.col("lag_speed").isNull,df2.col("speed")).otherwise(df1.col("lag_speed")).alias("lag_speed")
          )
    

    会给你预期的输出:

    +---------------+------------------+-----+------------------+------------------+---------+
    |             id|                dt|speed|             stats|         lag_stats|lag_speed|
    +---------------+------------------+-----+------------------+------------------+---------+
    |358899055773504|2018-07-3118:38:36|    0|[9,-1,-1,13,0,1,0]|[9,-1,-1,13,0,1,0]|        0|
    |358899055773504|2018-07-3118:58:34|    0| [9,0,-1,22,0,1,0]|[9,-1,-1,13,0,1,0]|        0|
    |358899055773505|2018-07-3118:54:23|    4|  [9,0,0,22,1,1,1]| [8,-1,0,22,1,1,1]|        4|
    +---------------+------------------+-----+------------------+------------------+---------+
    

    【讨论】:

    • 感谢您的回复,但不使用 joins 怎么办。加入是不是成本高昂的操作?
    • @experiment 你不能在两个单独的数据帧上执行任何这样的操作而不先加入它们。
    • @Shaido 嗨,先生,我有一个与上述问题无关的问题,我正在 spark 中进行某种处理,并希望实现一项功能,无论正在运行的处理如何,我都想安排一个计时器(以 5 分钟为间隔),它将一些数据保存到 cassandra(或者说任何其他来源)。我该怎么做?任何形式的见解都会有所帮助:)
    • @experiment:好像是想用流式数据,可以看看结构化流式,用cassandra做sink:spark.apache.org/docs/latest/…
    • @Shaido 我想实现一个定时器线程,对我来说这听起来像是两个任务并行运行,一个是跟踪 5 分钟的间隔,另一个是做我告诉它的所有处理做,就像我正在对流数据进行处理,然后我将该处理的输出缓存在 spark 中作为临时表,并且这个缓存的表在 spark 脚本中的某个地方再次使用,但只有在一段时间后我想坚持在 cassandra .我无法从您分享的链接中获取想法,我应该发布一个问题以便您提供书面解决方案吗?
    猜你喜欢
    • 2014-02-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-18
    • 2014-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多