【问题标题】:Spark JDBC connection to SQL Server times out oftenSpark JDBC 连接到 SQL Server 经常超时
【发布时间】:2018-11-21 06:26:47
【问题描述】:

我正在通过 sparklyr v0.6.2 运行 Spark v2.2.1,并通过 jdbc 从 SQL Server 中提取数据。我似乎遇到了一些网络问题,因为 很多次(不是每次)我的执行程序写入 SQL Server 失败并出现错误:

Prelogin error: host <my server> port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:...

我正在使用以下配置运行我的 sparklyr 会话:

spark_conf = spark_config()
spark_conf$spark.executor.cores <- 8
spark_conf$`sparklyr.shell.driver-memory` <- "8G"
spark_conf$`sparklyr.shell.executor-memory` <- "12G"
spark_conf$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
spark_conf$spark.network.timeout <- 400

但有趣的是,根据执行程序日志,我上面设置的网络超时似乎并不适用:

18/06/11 17:53:44 INFO BlockManager: Found block rdd_9_16 locally
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:3 ClientConnectionId: d3568a9f-049f-4772-83d4-ed65b907fc8b Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:d3568a9f-049f-4772-83d4-ed65b907fc8b
18/06/11 17:53:45 WARN SQLServerConnection: ConnectionID:2 ClientConnectionId: ecb084e6-99a8-49d1-9215-491324e8d133 Prelogin error: host nciensql14.nciwin.local port 1433 Error reading prelogin response: Connection timed out (Read failed) ClientConnectionId:ecb084e6-99a8-49d1-9215-491324e8d133
18/06/11 17:53:45 ERROR Executor: Exception in task 10.0 in stage 26.0 (TID 77)

谁能帮我理解什么是登录前错误以及如何避免这个问题?这是我的写函数:

function (df, tbl, db, server = NULL, user, pass, mode = "error", 
    options = list(), ...) 
{
    sparklyr::spark_write_jdbc(
  df, 
  tbl, 
  options = c(
    list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;", 
         "databaseName=", db, ";", 
         "user=", user, ";", 
         "password=", pass, ";"), 
       driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
    options), 
  mode = mode, ...)
}

我刚刚将我的 jdbc 驱动程序更新到 6.0 版,但我认为这没有什么不同。我希望我安装正确。我只是将它放入我的Spark/jars 文件夹,然后将其添加到Spark/conf/spark-defaults.conf

编辑 我正在将 24 个分区中的 23M 行读入 Spark。我的集群有 4 个节点,每个节点有 8 个内核和 18G 内存。使用我当前的配置,我有 4 个执行器,每个执行器有 8 个核心,每个执行器 12G。我读取数据的函数如下所示:

function (sc, tbl, db, server = NULL, user, pass, repartition = 0, options = list(), ...) 
{
    sparklyr::spark_read_jdbc(
      sc, 
      tbl, 
      options = c(
        list(url = paste0("jdbc:sqlserver://", server, ".nciwin.local;"), 
             user = user, 
             password = pass, 
             databaseName = db, 
             dbtable = tbl, 
             driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"), 
        options), 
      repartition = repartition, ...)
}

我在运行时将repartition 设置为 24。因此,我没有看到与建议的帖子的联系。

编辑 2
我能够通过摆脱重新分区来解决我的问题。谁能解释为什么在这种情况下使用 sparklyr 重新分区无效?

【问题讨论】:

  • 嗯,我没有在这些帖子之间建立联系。我的读取运行良好,我使用 24 个分区,所以我看不到它是如何应用的。请注意,当我连接到不同的 SQL Server DB 时,我曾经在没有读取分区的情况下做得很好。
  • 我还没有结束这个问题。 spark_read_jdbc 函数不像你想象的那样工作。重新分区发生在拉取数据之后,这是问题的根源。您可以在函数的source code 中查看。
  • 这使得这个问题重复了。但是,如果您希望更好地了解如何实际解决此问题,建议您阅读我的答案here
  • @eliasah 如果您回答问题并提供有关重新分区为何导致超时问题的建议,我会将其标记为关闭此问题的最佳答案。

标签: r sql-server apache-spark sparklyr


【解决方案1】:

正如in the other question 的解释,以及其他一些帖子(Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?Converting mysql table to spark dataset is very slow compared to same from csv filePartitioning in spark while reading from RDBMS via JDBCspark reading data from mysql in parallel)和场外资源(Parallelizing Reads),默认情况下 Spark JDBC 源读取所有数据顺序到单个节点。

有两种并行读取的方式:

  • 基于数值列的范围分割,lowerBoundupperBoundpartitionColumnnumPartitions options 需要,其中partitionColumn 是一个稳定的数值列 (pseudocolumns might not be a good choice)

    spark_read_jdbc(
      ...,
      options = list(
        ...
        lowerBound = "0",                 # Adjust to fit your data 
        upperBound = "5000",              # Adjust to fit your data 
        numPartitions = "42",             # Adjust to fit your data and resources
        partitionColumn = "some_numeric_column"
      )
     )
    
  • predicates 列表 - 目前在 sparklyr 中不支持。

重新分区(sparklyr::sdf_repartition 无法解决问题,因为它发生在加载数据之后。由于 shuffle(repartition 需要)属于 Spark 中最昂贵的操作,它很容易使节点崩溃。

因此使用:

  • spark_read_jdbcrepartition参数:

  • sdf_repartition

只是一种货物崇拜的做法,而且大多数时候弊大于利。如果数据小到可以通过单个节点进行管道传输,那么增加分区数量通常会降低性能。否则它只会崩溃。

话虽如此 - 如果数据已经由单个节点处理,则会提出一个问题,即使用 Apache Spark 是否有意义。答案将取决于您管道的其余部分,但仅考虑有问题的组件,它可能是否定的。

【讨论】:

  • 如果我只有 varchar 列可以使用怎么办?我完全不走运吗?
猜你喜欢
  • 2014-12-01
  • 2013-04-13
  • 2017-10-17
  • 2013-10-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-09
相关资源
最近更新 更多