【问题标题】:Spark: optimise writing a DataFrame to SQL ServerSpark:优化将 DataFrame 写入 SQL Server
【发布时间】:2019-09-06 13:13:31
【问题描述】:

我正在使用下面的代码将 43 列和大约 2,000,000 行的 DataFrame 写入 SQL Server 中的表中:

dataFrame
  .write
  .format("jdbc")
  .mode("overwrite")
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("url", url)
  .option("dbtable", tablename)
  .option("user", user)
  .option("password", password)
  .save()

遗憾的是,虽然它适用于小型 DataFrame,但它要么非常慢,要么对于大型 DataFrame 超时。关于如何优化它的任何提示?

我试过设置rewriteBatchedStatements=true

谢谢。

【问题讨论】:

  • 您的行中有大量列(MEDIUMTEXT、LONGTEXT、BLOB...)?尝试在您的数据库中查看在写入命令(SHOW PROCESSLIST sql 命令)时并行执行的查询数量。也许它可以给你一些错误的痕迹
  • @Dawid 你终于找到使用 PySpark 的方法了吗?

标签: sql sql-server database scala apache-spark


【解决方案1】:

尝试将 batchsize 选项添加到您的语句中,至少使用> 10000(相应地更改此值以获得更好的性能)并再次执行写入。

From spark docs:

JDBC 批处理大小,它决定每次插入多少行 往返。这有助于提高 JDBC 驱动程序的性能。这个选项 仅适用于写作。它默认为 1000

也值得一看:

  • numPartitions option 增加并行度(这也决定了最大并发JDBC连接数)

  • queryTimeout option 增加写入选项的超时时间。

【讨论】:

  • 谢谢。使用批量大小可以获取大约一百万条记录。但是对于较大的错误仍然会出现以下错误:com.microsoft.sqlserver.jdbc.SQLServerException:连接已关闭。
  • @Dawid,您是否也尝试过使用 queryTimeout 选项?(or) 尝试将 keep alive/querytimeout** 参数添加到您的 jdbc url (or) 询问 sql server管理员根据您的查询时间增加 timeouts..!!
  • @Shu 你分享的链接失效了
【解决方案2】:

我们采用了azure-sqldb-spark 库,而不是 Spark 的默认内置导出功能。这个库为您提供了一个bulkCopyToSqlDB 方法,它是一个真正的 批量插入,并且速度很多。使用起来不如内置功能实用,但根据我的经验,它仍然值得。

我们或多或少这样使用它:

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._

val options = Map(
  "url"          -> "***",
  "databaseName" -> "***",
  "user"         -> "***",
  "password"     -> "***",
  "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)

// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)

val bulkConfig = Config(options ++ Map(
  "dbTable"           -> "myTable",
  "bulkCopyBatchSize" -> "20000",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkConfig)

如您所见,我们自己生成了CREATE TABLE 查询。你可以让库创建表,但它只会做dataFrame.limit(0).write.sqlDB(config),这仍然效率很低,可能需要你缓存你的DataFrame,它不允许你选择SaveMode

还可能很有趣:当将此库添加到我们的 sbt 构建时,我们必须使用 ExclusionRule,否则 assembly 任务将失败。

libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
  ExclusionRule(organization = "org.apache.spark")
)

【讨论】:

  • 大家好,我正在使用 pyspark,jdbc 驱动程序正在成为瓶颈。有什么建议可以提高上述场景中的性能吗?
  • @FelipePerezR 你找到替代品了吗?
【解决方案3】:

为了提高使用 PY-Spark 的性能(由于管理限制只能使用 python、SQL 和 R),可以使用以下选项。

方法一:使用 JDBC 连接器

此方法逐行读取或写入数据,导致性能问题。 不推荐

df.write \
.format("jdbc") \
.mode("overwrite or append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()

方法 2:使用 Apache Spark 连接器(SQL Server 和 Azure SQL)

此方法使用批量插入来读取/写入数据。还有更多的选择可以进一步探索。

首先在 Data-bricks 集群中使用 Maven Coordinate 安装库,然后使用以下代码。

推荐用于 Azure SQL DB 或 Sql Server 实例

https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15

df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite or append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("batchsize", as per need) \
.option("mssqlIsolationLevel", "READ_UNCOMMITTED")\
.save()

方法 3:使用 Azure 专用 SQL 池的连接器(以前称为 SQL DW)

此方法以前使用 Poly-base 使用暂存服务器(主要是 Blob 存储或 Data Lake 存储目录)从 Azure Synapse 读取和写入数据,但现在正在读取数据并使用 Copy 进行写入,因为 Copy 方法提高了性能。

推荐用于 Azure Synapse

https://docs.databricks.com/data/data-sources/azure/synapse-analytics.html

df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

【讨论】:

    【解决方案4】:

    您可以选择将数据转换为 CSV 文件并复制这些 CSV 文件吗? 我们已经为更大的表格自动化了这个过程,并以 CSV 格式在 GCP 中传输这些表格。而不是通过 JDBC 读取。

    【讨论】:

    • 这不是问题的答案。
    【解决方案5】:

    您可以使用sql-spark connector

    df.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("overwrite") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .save()
    

    更多信息也here

    【讨论】:

      猜你喜欢
      • 2017-01-09
      • 1970-01-01
      • 1970-01-01
      • 2019-06-10
      • 2022-01-23
      • 1970-01-01
      • 2020-06-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多