【问题标题】:Spark reading from Postgres JDBC table slow从 Postgres JDBC 表读取 Spark 很慢
【发布时间】:2017-09-17 22:15:38
【问题描述】:

我正在尝试将大约 100 万行从 PostgreSQL 数据库加载到 Spark 中。使用 Spark 大约需要 10 秒。但是,使用 psycopg2 驱动程序加载相同的查询需要 2 秒。我正在使用 postgresql jdbc 驱动程序版本 42.0.0

def _loadFromPostGres(name):
    url_connect = "jdbc:postgresql:"+dbname
    properties = {"user": "postgres", "password": "postgres"}
    df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
    return df

df = _loadFromPostGres("""
    (SELECT "seriesId", "companyId", "userId", "score" 
    FROM user_series_game 
    WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

输出是 -

--- 10.7214591503 seconds ---
1076131

使用 psycopg2 -

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
    cur.execute("""(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298)""")
    return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

输出是 -

--- 2.27961301804 seconds ---
1076131

测量函数-

def measure(func) :
    start_time = time.time()
    x = func()
    print("--- %s seconds ---" % (time.time() - start_time))
    return x

请帮我找出这个问题的原因。


编辑 1

我又做了一些基准测试。使用 Scala 和 JDBC -

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long, Long, Long, Double)]()

while (rs.next()) {
    val seriesId = rs.getLong("seriesId")
    val companyId = rs.getLong("companyId")
    val userId = rs.getLong("userId")
    val score = rs.getDouble("score")
    list.append((seriesId, companyId, userId, score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

输出是 -

Elapsed time: 1.922102285s
1143402

当我在 Spark + Scala 中进行 collect() 时 -

import org.apache.spark.sql.SparkSession

def exec2() {

    val spark = SparkSession.builder().getOrCreate()

    val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

    val sqlText = """(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298) as user_series_game"""

    val t0 = System.nanoTime()

    val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", sqlText)
          .option("user", "postgres")
          .option("password", "postgres")
          .load()

    val list = df.collect()

    val t1 = System.nanoTime()

    println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

    print (list.size)
}

exec2()

输出是

Elapsed time: 1.486141076s
1143445

因此在 Python 序列化中花费了 4 倍的额外时间。我知道会有一些惩罚,但这似乎太过分了。

【问题讨论】:

    标签: postgresql apache-spark jdbc pyspark spark-dataframe


    【解决方案1】:

    原因很简单,同时有两个原因。

    首先,我将向您介绍psycopg2 的工作原理。

    此库psycopg2 与任何其他库一样可以连接到 RDMS。该库会将查询发送到您的 postgres 引擎,并将数据返回给您。就这样直奔吧。

    Conn -> 查询 -> ReturnData -> FetchData

    当你使用 spark 时,有两种方式有点不同。 Spark 不像是一种在单个线程中运行的程序语言。它有一个分布式系统可以工作。即使您在本地机器上运行。请参阅 Spark 有 Driver(Master) 和 Workers 的基本概念。

    Driver 收到对 Postgres 执行查询的请求,Driver 不会为每个 worker 请求数据,请求来自 Postgres 的信息。

    如果您看到文档here,您会看到这样的注释:

    不要在大型​​集群上并行创建太多分区;否则 Spark 可能会导致您的外部数据库系统崩溃。

    此注释意味着每个工作人员都将负责为您的 postgres 请求数据。这是启动此过程的一小部分开销,但没什么大不了的。但是这里有一个开销,将数据发送给每个工作人员。

    第二点,你在这部分代码中收集:

    print measure(lambda : len(df.collect()))
    

    collect 函数会向您的所有工作人员发送一个命令,将数据发送给您的驱动程序。要存储在驱动程序的内存中,它就像一个 Reduce,它在过程中间创建 Shuffle。随机播放是将数据发送给其他工作人员的过程中的步骤。在收集的情况下,每个工人都会将其发送给您的司机。

    所以你的代码在 JDBC 中 Spark 的步骤是:

    (Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver) 请求数据 -> (Workers) Shuffle -> (Driver) Collect

    Spark 还有很多其他的东西,比如 QueryPlan、构建 DataFrame 和其他东西。

    这就是 Python 的简单代码比 Spark 响应更快的原因。

    【讨论】:

    • 我们在将数据从 postgresql 加载到 spark 时面临重大问题。基本上,我们的想法是将驱动程序中的所有数据加载到 pandas 数据帧中,并将其转换为 spark 数据帧,然后运行 ​​spark 分布式。你有什么建议?
    • 请不要将所有数据都加载到 Pandas 中,这样不好。如果你有 Spark 集群,你应该使用 python 的 JDBC 工具从 postgres 加载数据,将数据直接加载到工作人员。 spark.apache.org/docs/latest/api/python/…
    猜你喜欢
    • 1970-01-01
    • 2015-10-28
    • 2018-03-11
    • 2015-08-15
    • 2017-10-01
    • 2013-11-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多