【问题标题】:Optimize multiple JDBC queries with Spark使用 Spark 优化多个 JDBC 查询
【发布时间】:2023-03-11 19:50:01
【问题描述】:

我正在尝试使用 Spark 从 Greenplum 数据库中提取增量数据。每个表的增量数据都带有一个名为transactionId 的键。 每个transactionId 可以包含一行或多行的数据。所有这些都存储在元数据表中:incKeyTable。 我们还有另一个元数据表中每个表的最后移动的transactionIDincKeyLoads。此表包含每个表的一个条目,它是最后更新到生产表中的transactionId。 为了找出每个表的增量transactionid,我想出了以下逻辑。

val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap   = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap

现在我在 scala Map 中有我上次更新的事务 ID,如下所示:

lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)

为了找出最新的transactionId(增量数据)来greenplum,我写了以下逻辑来查询元数据表:incKeyTable

Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
    val returnMsg = "Full loads on this table"
    val count = incTransIds.where($"load_type" === "FULLLOAD").count
    if(count == 0) {
      incTransIds.createOrReplaceTempView("incTransID")
      val execQuery  = s"SELECT transactionId from incTransID order by transactionId desc"
      val incLogIdDf = spark.sql(execQuery)
      incLogIdDf.show
      val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
      pushLogIds
    } else {
      println("Full load count is greater than zero..")
      returnMsg
    }
}

var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
    val tablename = keyTable.split("\\.")   // Tablename = schema.tablename
    val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
    incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
    }
)

此方法有效,但由于数据帧 cdf 很大,因此我可以在此搜索完成之前从表级别的 greenplum 中提取整个数据。我试图缓存数据帧:cdf,但它包含近 500 万行,并被建议不要将这么大的表缓存到缓存中。 我想不出其他可以使搜索更快的方法。谁能让我知道一个让这个过程变得高效的想法?

【问题讨论】:

    标签: sql scala apache-spark jdbc greenplum


    【解决方案1】:

    问题中的代码不可能是您实际运行的代码,因为您在 sortLogIds 中返回 pushLogIds,它从未定义过,并且您从 schema.tablename 中选择,而不是从 s"schema.$tablename" 中选择。这使得很难确定发生了什么......

    也就是说,从大数据处理模式的角度来看,您的方法存在几个潜在问题:

    1. 迭代而不是 UNION 转换。 在其他条件相同的情况下,与其发出许多单独的查询然后在驱动程序上组装结果,不如想办法发出单个查询询问。这就是优化器有机会提供帮助的方式。在您的情况下,请考虑创建一个 Greenplum 视图来组合 lutransIdTableMap 中的所有表。

    2. 操作而不是连接转换。sortLogIds 中,您执行count 操作只是为了决定是否运行其他查询。在其他条件相同的情况下,最好通过连接转换来表达这一点,以延迟操作的运行。稍后您发出一个show,它实际上等同于take(n)。这个动作真的有必要吗?稍后您使用collect 来生成SQL 表达式以在IN 运算符中使用。这是另一个您应该使用联接的示例。总而言之,您正在执行相同的由incTransId 表示的Greenplum 基本查询三次。如果你坚持这种处理方式,你绝对应该以某种方式坚持incTransId

    3. 使用 SQL 汇编而不是 DSL。 通常,如果您通过编程语言而不是通过 SparkSQL 使用 Spark,则应该使用 DSL 而不是将 SQL 表达式汇编为字符串。这样,您就不需要重新定义视图等。

    如果没有完整的代码,也不知道确切的 Greenplum 架构 + 分布策略 + 索引(如果有的话)和所涉及的数据大小,这里有太多需要解决的问题。但是,以上内容应该为您提供一个起点。

    这是一个如何从使用迭代切换到联合的示例。

    val allData = Map("table1" -> 101, "table2" -> 212)
      .map { case (tableName, id) =>
        spark.table(tableName).withColumn("id", lit(id))
      }
      .reduceLeft(_ union _)
    

    这是一个如何使用连接而不是 collect + IN 的示例。

    val allIds = spark.range(100)
    val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
    allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
    allIds.join(myIds, Seq("id")) // inner join delays action
    

    上面的示例还展示了如何使用带有collect 的数据集,例如将.collect().map(_.getInt(0).toString) 替换为.as[String].collect,这样更简单、更安全、更快捷。

    希望这会有所帮助!

    【讨论】:

    • 好的。从现在开始,我将尝试更多地使用 DSL。我尝试了您上面给出的简单逻辑。这个特殊的语句: allIds.where(id.isin(myIds.as[Int].collect: _*)) 给了我一个错误:无法解析重载方法'where'..我应该在这里改变什么。
    • @Metadata 您在id 之前缺少单引号。存在从 SymbolColumn 的隐式转换。或者,如果您已导入 functions,则可以使用 $"id"col("id")
    • 抱歉回复晚了。正如您在选项 2 中提到的那样,我持久化了我的数据帧,其中包含用于读取数据的增量密钥 incTransId.persist(MEMORY_ONLY_SER)。这甚至不需要一分钟就可以完成我尝试执行的搜索操作。
    • @Metadata 我很高兴!
    猜你喜欢
    • 1970-01-01
    • 2020-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-12
    • 2017-01-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多