【问题标题】:Reading first table and execute the inputs in spark SQL and insert into another table读取第一个表并在 spark SQL 中执行输入并插入到另一个表中
【发布时间】:2025-12-04 02:20:07
【问题描述】:

请为以下要求推荐最佳代码,我们使用的是 Apache Spark 框架。

我有一个表1和表2,如下图。首先,我们需要根据Test_id读取表1,然后依次执行相应的查询(_SRC和_Target),然后将输出插入表2并进行一些基本比较(如,=等)从表 1 查询输出计数并将结果写入表 2,并附上日期和用户详细信息。

提前致谢!

表 1

表 2

【问题讨论】:

  • 下面的答案有效吗??
  • 对不起,我忘了上次你问我 test_condition,你能更新你的问题你想要 test_condition 列的值吗?
  • 表 1 和表 2 将有相似的列,然后在表 1 中提供 Actual_results,然后将计算该计数并与 Exetion_script_target 列中查询获得的计数进行比较,然后根据该值进行比较例如,= 或 > 或

标签: scala apache-spark pyspark apache-spark-sql


【解决方案1】:

请检查以下代码。

表 1 的创建:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df = Seq(
    (1,"select count(*) from dbnameaaa.tbl_name","select count(*) from dbnameaaa.tbl_name"),
    (2,"select count(*) from dbnameaaa.tmp_tbl","select count(*) from dbnameaaa.tmp_tbl"))
    .toDF("test_id","execution_script_src","execution_script_target")

// Exiting paste mode, now interpreting.

df: org.apache.spark.sql.DataFrame = [test_id: int, execution_script_src: string ... 1 more field]

scala> df.show(false)
+-------+---------------------------------------+---------------------------------------+
|test_id|execution_script_src                   |execution_script_target                |
+-------+---------------------------------------+---------------------------------------+
|1      |select count(*) from dbnameaaa.tbl_name|select count(*) from dbnameaaa.tbl_name|
|2      |select count(*) from dbnameaaa.tmp_tbl |select count(*) from dbnameaaa.tmp_tbl |
+-------+---------------------------------------+---------------------------------------+

创建查询执行和条件 UDF

scala> :paste
// Entering paste mode (ctrl-D to finish)

val execute = udf((query: String) => {
    try { spark.sql(query).map(_.getAs[Long](0)).collect.head }catch { case _: Exception => 0L }
})

val condition = udf((actual:Long,expected:Long) => {
   s"""{"=":"${if (actual == expected) "Pass" else "Fail"}","<":"${if (actual < expected) "Pass" else "Fail"}",">":"${if (actual > expected) "Pass" else "Fail"}","<>":"${if (actual != expected) "Pass" else "Fail"}"}"""
})

// Exiting paste mode, now interpreting.

execute: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))
condition: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StringType,Some(List(LongType, LongType)))


决赛桌结果

scala> :paste
// Entering paste mode (ctrl-D to finish)

df
.withColumn("actual_result",execute($"execution_script_src"))
.withColumn("expected_result",execute($"execution_script_target"))
.withColumn("test_condition",lit("[ =, <, >, <> ]"))
.withColumn("test_result",condition($"actual_result",$"expected_result"))
.withColumn("create_date",current_date)
.withColumn("modify_date",current_date)
.withColumn("created_by",lit(spark.sparkContext.sparkUser))
.withColumn("modified_by",lit(spark.sparkContext.sparkUser))
.withColumn("execute_date",current_date)
.show(false)

// Exiting paste mode, now interpreting.

+-------+---------------------------------------+---------------------------------------+-------------+---------------+---------------+----------------------------------------------+-----------+-----------+----------+-----------+------------+
|test_id|execution_script_src                   |execution_script_target                |actual_result|expected_result|test_condition |test_result                                   |create_date|modify_date|created_by|modified_by|execute_date|
+-------+---------------------------------------+---------------------------------------+-------------+---------------+---------------+----------------------------------------------+-----------+-----------+----------+-----------+------------+
|1      |select count(*) from dbnameaaa.tbl_name|select count(*) from dbnameaaa.tbl_name|11           |11             |[ =, <, >, <> ]|{"=":"Pass","<":"Fail",">":"Fail","<>":"Fail"}|2020-05-06 |2020-05-06 |srinivas  |srinivas   |2020-05-06  |
|2      |select count(*) from dbnameaaa.tmp_tbl |select count(*) from dbnameaaa.tmp_tbl |11           |22             |[ =, <, >, <> ]|{"=":"Fail","<":"Pass",">":"Fail","<>":"Pass"}|2020-05-06 |2020-05-06 |srinivas  |srinivas   |2020-05-06  |
+-------+---------------------------------------+---------------------------------------+-------------+---------------+---------------+----------------------------------------------+-----------+-----------+----------+-----------+------------+

【讨论】:

  • 感谢您的回答@Srinivas,但在我的要求中,测试条件应该是任何一种条件,因此,它必须在 test_result 中更新为仅通过或失败。
最近更新 更多