【发布时间】:2021-10-27 13:35:35
【问题描述】:
我有一个如下的 scala 列表。
partList: ListBuffer(2021-10-01, 2021-10-02, 2021-10-03, 2021-10-04, 2021-10-05, 2021-10-06, 2021-10-07, 2021-10-08)
目前我根据上述日期将所有数据从源获取到数据框中。
fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
稍后我将进行一些转换并将数据加载到增量表中。示例代码如下。
fctDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
if (fctExistingDF.count() > 0) {
fctDF.createOrReplaceTempView("vw_exist_fct")
val existingRecordsQuery = getExistingRecordsMergeQuery(azUpdateTS,key)
ss.sql(existingRecordsQuery)
.drop("az_insert_ts").drop("az_update_ts")
.withColumn("az_insert_ts", col("new_az_insert_ts"))
.withColumn("az_update_ts", col("new_az_update_ts"))
.drop("new_az_insert_ts").drop("new_az_update_ts")
.select(mrg_tbl_cols(0), mrg_tbl_cols.slice(1,mrg_tbl_cols.length): _*)
.coalesce(72*2)
.write.mode("Append").format("delta")
.insertInto(mergeTable)
mergedDataDF = ss.read.table(mergeTable).coalesce(72*2)
mergedDataDF.coalesce(72)
.write.mode("Overwrite").format("delta")
.insertInto(s"${tgtSchema}.${tgtTbl}")
代码中的以下命令是根据 partList 中 event_date 上的过滤条件创建数据帧。
fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
由于它正在创建包含大量数据的数据框,我想循环零件列表中的每个日期并将数据读入数据框中,而不是一次过滤零件列表中的所有日期。
我在下面试过了。
var counter = 0
while (counter < partList.length) {
fctExistingDF = ss.read.table(existingTable).filter(s"event_date in (I should pass 1st date from the list)
counter = counter + 1
我是 scala 的新手,我们应该在这里使用 foreach 吗? 有人可以帮忙吗。谢谢。
【问题讨论】:
标签: scala apache-spark