【问题标题】:SparkSQL JDBC (PySpark) to Postgres - Creating Tables and Using CTEsSparkSQL JDBC (PySpark) 到 Postgres - 创建表和使用 CTE
【发布时间】:2020-05-22 22:41:30
【问题描述】:

我正在开展一个项目,将 Python 概念证明 (POC) 移植到 PySpark。 POC 大量利用 Postgres,特别是 PostGIS 地理空间库。大部分工作包括 Python 在回调数据进行最终处理之前向 Postgres 发出命令。

传递给 Postgres 的一些查询包含 CREATE TABLEINSERTCREATE TEMP TABLE 和 CTE WITH 语句。我正在尝试确定是否可以通过 JDBC 将这些查询从 Spark 传递给 Postgres。

有人可以确认此功能在 Spark JDBC 中是否可用于其他数据库吗?需要明确的是,我想将简单的英语 SQL 查询传递给 Postgres,而不是使用可用的 SparkSQL API(因为它们不支持我需要的所有操作)。我正在使用 Spark 版本 2.3.0PostgreSQL 10.11Python 2.7.5(是的,我知道 Python 2 的 EOL,这是另一个故事)。

这是我迄今为止尝试过的:

使用SparkSession.read

创建到 Postgres 的 Spark 会话

postgres = SparkSession.builder \
    .appName("myApp") \
    .config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
    .getOrCreate()

定义要传递给dbtable参数的查询

qry = """create table test (name varchar(50), age int)"""

qry 传递给 Postgres spark 会话对象的 dbtable 参数

postgres.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://....) \
    .option("dbtable", qry) \
    .option("user", configs['user']) \
    .option("password", configs['password']) \
    .option("driver", "org.postgresql.Driver") \
    .option("ssl", "true") \
    .load()

返回以下语法错误(使用上面列出的其他 SQL 命令时会出现相同类型的错误):

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 9, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
  Position: 15

使用SparkSession.sql()

利用上面定义的相同 postgres 对象

将查询传递给 .sql()

postgres.sql("""create table (name varchar(50), age int)""")

返回以下解析异常:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"

如果我像postgres.sql("(create table (name varchar(50), age int))") 那样将查询用引号括起来,那么我会得到一个不同的解析异常,这让我相信我想要的功能是不可能的:

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
    exec(code, _zcUserQueryNameSpace)
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"

我的问题归结为:

  1. 我的方法是否缺少某种配置或其他必要步骤?
  2. spark.sql() API 能否以某种方式与 Postgres 一起使用?
  3. 我正在努力实现的目标是否可能?

我在互联网上搜索过,试图找到使用 SparkSQL 向 PostgreSQL 发出此类 SQL 查询的示例,但没有找到任何解决方案。如果有解决方案,我会很高兴看到一个例子,否则确认这是不可能的就足够了。

【问题讨论】:

    标签: python postgresql apache-spark jdbc pyspark


    【解决方案1】:

    我正在努力实现的目标是否可能?

    我会说不。 Spark 是一个数据处理框架,因此它的 API 主要用于数据源的 readwrite 操作。在您的情况下,您有一些 DDL 语句,Spark 不应该执行此类操作。

    例如,第一个示例中的 dbtable 选项必须是表名或某些 SELECT 查询。

    如果您需要运行一些 DDL、DCL、TCL 查询,那么您应该以其他方式执行此操作,例如通过psycopg2 模块。

    可以通过 Postgres 以某种方式利用 spark.sql() API 吗?

    spark.sql 是一种在 SparkSession 表或视图中注册的 SparkSQL 代码的方法。它适用于任何受支持的数据源,不仅是 jdbc,而且它适用于具有 SparkSQL 语法的 Spark 端。例如

    val spark = SparkSession
            ...
            .getOrCreate()
    
    spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql://ip/database_name")
      .option("dbtable", "schema.tablename")
      .load()
      .createOrReplaceTempView("my_spark_table_over_postgresql_table")
    
    // and then you can operate with a view:
    val df = spark.sql("select * from my_spark_table_over_postgresql_table where ... ")
    

    【讨论】:

    • 感谢您的回答,这是我的感觉,但我想要一些确认,您提供了!并感谢您对 SparkSession 表的澄清。至于您对psycopg2 的评论,我之前在非分布式应用程序中使用过它,并且没有考虑在Spark 应用程序中尝试一下。你有这方面的经验/可以谈谈它的功效吗?如果您有任何我可以在此集成中引用的链接,我将不胜感激。
    【解决方案2】:

    不是最好的选择,但您可以使用 SQL 注入来解决它。
    spark.read
      .format("jdbc")
      .option("url", s"""jdbc:postgresql://8.8.8.8/dbname""")
      .option(
        "dbtable",
        "(select 1) a; CREATE OR REPLACE VIEW schema.view AS SELECT c1 FROM schema.table WHERE c1 in ('C', 'H'); select * from (select 1) a"
      )
      .load()
    

    Spark 返回错误但正在创建视图:

    org.postgresql.util.PSQLException:查询返回了多个结果集。

    我的解决方案适用于 Spark 2.3.0,在 Spark 3.2.0 中有比破解“dbtable”更好的选择,您可以使用名为“query”的字段。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-18
      • 2015-06-22
      • 2020-10-09
      • 1970-01-01
      • 1970-01-01
      • 2017-12-06
      • 2022-11-30
      • 2023-04-04
      相关资源
      最近更新 更多