【问题标题】:Looping the scala list in Spark在 Spark 中循环 scala 列表
【发布时间】:2021-10-27 13:35:35
【问题描述】:

我有一个如下的 scala 列表。

partList: ListBuffer(2021-10-01, 2021-10-02, 2021-10-03, 2021-10-04, 2021-10-05, 2021-10-06, 2021-10-07, 2021-10-08)

目前我根据上述日期将所有数据从源获取到数据框中。

fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")

稍后我将进行一些转换并将数据加载到增量表中。示例代码如下。

fctDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
    if (fctExistingDF.count() > 0) {
fctDF.createOrReplaceTempView("vw_exist_fct")
val existingRecordsQuery = getExistingRecordsMergeQuery(azUpdateTS,key)
ss.sql(existingRecordsQuery)
.drop("az_insert_ts").drop("az_update_ts")
.withColumn("az_insert_ts", col("new_az_insert_ts"))
.withColumn("az_update_ts", col("new_az_update_ts"))
.drop("new_az_insert_ts").drop("new_az_update_ts")
.select(mrg_tbl_cols(0), mrg_tbl_cols.slice(1,mrg_tbl_cols.length): _*)
.coalesce(72*2)
.write.mode("Append").format("delta")
.insertInto(mergeTable)
mergedDataDF = ss.read.table(mergeTable).coalesce(72*2)

mergedDataDF.coalesce(72)
      .write.mode("Overwrite").format("delta")
      .insertInto(s"${tgtSchema}.${tgtTbl}")
      

代码中的以下命令是根据 partList 中 event_date 上的过滤条件创建数据帧。

fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")

由于它正在创建包含大量数据的数据框,我想循环零件列表中的每个日期并将数据读入数据框中,而不是一次过滤零件列表中的所有日期。

我在下面试过了。

var counter = 0

while (counter < partList.length) {
  
  fctExistingDF = ss.read.table(existingTable).filter(s"event_date in (I should pass 1st date from the list)
 counter = counter + 1
 

我是 scala 的新手,我们应该在这里使用 foreach 吗? 有人可以帮忙吗。谢谢。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您可以使用foreachmap,取决于您是否要返回值(map)或不(foreach):

    import org.apache.spark.sql.functions.col
    
    partList = List("2021-10-01", "2021-10-02", "2021-10-03", "2021-10-04", "2021-10-05", "2021-10-06", "2021-10-07", "2021-10-08")
    
    partList.foreach { case date =>
       fctExistingDF = ss.read.table(existingTable).filter(col("event_date") === date) 
    }
    

    如果要返回数据框列表,请使用:

    val dfs = partList.map { case date =>
       fctExistingDF = ss.read.table(existingTable).filter(col("event_date") === date) 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-06-04
      • 1970-01-01
      • 1970-01-01
      • 2012-10-10
      • 1970-01-01
      • 1970-01-01
      • 2018-09-29
      • 2017-08-11
      相关资源
      最近更新 更多