【问题标题】:Spark dataframe adding new column issue - Structured streamingSpark数据框添加新列问题 - 结构化流
【发布时间】:2020-10-05 17:30:31
【问题描述】:

我正在使用 spark 结构化流。我有一个数据框并添加了一个新列“current_ts”。

inpuDF.withColumn("current_ts", lit(System.currentTimeMillis()))

这不会用当前纪元时间更新每一行。它会在触发作业时更新相同的 epcoh 时间,从而导致 DF 中的每一行都具有相同的值。这适用于正常的火花作业。这是 spark 结构化流的问题吗?

【问题讨论】:

  • 嗨@Nats,你能做到这一点吗?我有类似的要求。

标签: apache-spark spark-structured-streaming


【解决方案1】:

spark 将您的转换记录为沿袭图,并且仅在调用某些操作时才执行该图。所以它会调用

System.currentTimeMillis()

当某个动作被触发时。我不明白其中的内容让您感到困惑或您想要实现什么。谢谢。

【讨论】:

  • 我正在尝试为 DF 中的每条记录添加一个值为 System.currentTimeMillis() 的新列。但即使在运行结构化流式作业几个小时后,我最终在列中也得到了相同的值。
  • 那么您可能想在数据插入时添加时间戳,然后将该数据用作DataFrame。不使用 withColumn。
【解决方案2】:

Spark 有一个function to create a column with current timestamp。您的代码应如下所示:

import org.apache.spark.sql.functions

// ...

inpuDF.withColumn("current_ts", functions.current_timestamp())

【讨论】:

    【解决方案3】:

    您的方法的问题是使用 lit 是文字函数或常量。 Spark 会将其视为从驱动程序传递的常量。 因此,当您执行作业时,文字将根据您执行的时间进行评估。 所有记录都有相同的时间戳。 您需要改用函数。 current_timestamp() 应该可以工作。

    【讨论】:

      【解决方案4】:

      试试这个

      inpuDF.writeStream.partitionBy('current_ts')

      【讨论】:

      • 请在您的答案中添加一些解释,以便其他人可以从中学习