【发布时间】:2021-02-13 15:31:02
【问题描述】:
我是 Spark Scala 世界的新手。我正在尝试复制 ETL 逻辑。所以本质上,我想创建动态代码,其中我需要从存在特定列的表中提取数据,对此列进行过滤,然后将其数据存储到 Azure blob。
val url = "<Host Address>"
val user = "<Username>"
val pw = "<Password>"
val driver = "org.postgresql.Driver"
val sslfactory = "org.postgresql.ssl.NonValidatingFactory"
var sql_lookup = " select * from information_schema.tables as inf_schema left join (SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = 'Schema_name' AND columns.column_name = 'Column_A') as country on inf_schema.table_schema = country.country_table_schema and inf_schema.table_name = country.country_table_name WHERE inf_schema.table_schema='<Schemaname>'"
var dfTbl = (spark.read
.format("jdbc")
.option("url", url)
.option("ssl","true")
.option("sslfactory",sslfactory)
.option("user", user)
.option("password", pw)
.option("driver", driver)
.option("query",sql_lookup)
.load())
var dfTbl_withCountry = (dfTbl.select(dfTbl.col("*")).filter(dfTbl.col( "country_table_column_name" ).isNotNull)).select("table_name")
val dfTbl_wc = dfTbl_withCountry.collect().foreach(row => row.toSeq.foreach(col => (col)))
for (table <- dfTbl_wc ){
var sql = " select * from <Schemaname>."+s"${table}" + " where <Colume_name> = '<Value>'"
var df = (spark.read
.format("jdbc")
.option("url", url)
.option("ssl","true")
.option("sslfactory",sslfactory)
.option("user", user)
.option("password", pw)
.option("driver", driver)
.option("query",sql)
.load())
var File_withCountry = df
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter", "~")
.mode(SaveMode.Overwrite)
.option("encoding", "UTF-8")
.csv("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
val partition_path = dbutils.fs.ls("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
.filter(file=>file.name.startsWith("part"))(0).path
dbutils.fs.cp(partition_path,"wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}"+".csv")
dbutils.fs.rm (partition_path, recurse = true)
}
下面是查询输出内子查询
SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = '<Schema_name>' AND columns.column_name = 'Column_A'
作为“country_table_name”列中数据框 sql_lookup 输出的每个表名,我想提取它们。 我将其输出存储在数据帧 dfTbl 中。因此,在数据帧 dfTbl_wc 中,我正在迭代数据帧 dfTbl 中的每一行。在此我使用 for 循环从 dfTbl_wc 中数据框中的每一行中选择完整数据
但由于某种原因,此代码在 for 循环部分中无法正常工作。 请帮忙!
【问题讨论】:
-
您面临的错误或问题是什么? For 循环不能正常工作并没有给出任何上下文。
-
空指针异常和错误:值数据框不是Unit的成员
-
@NikunjKakadiya - 有什么线索吗??
-
在问题中提供一些测试数据,以便我可以帮助您
-
@NikunjKakadiya - 我编辑了帖子以提供更多信息。这有意义吗?
标签: postgresql scala apache-spark databricks