【发布时间】:2020-05-22 22:41:30
【问题描述】:
我正在开展一个项目,将 Python 概念证明 (POC) 移植到 PySpark。 POC 大量利用 Postgres,特别是 PostGIS 地理空间库。大部分工作包括 Python 在回调数据进行最终处理之前向 Postgres 发出命令。
传递给 Postgres 的一些查询包含 CREATE TABLE、INSERT、CREATE TEMP TABLE 和 CTE WITH 语句。我正在尝试确定是否可以通过 JDBC 将这些查询从 Spark 传递给 Postgres。
有人可以确认此功能在 Spark JDBC 中是否可用于其他数据库吗?需要明确的是,我想将简单的英语 SQL 查询传递给 Postgres,而不是使用可用的 SparkSQL API(因为它们不支持我需要的所有操作)。我正在使用 Spark 版本 2.3.0、PostgreSQL 10.11 和 Python 2.7.5(是的,我知道 Python 2 的 EOL,这是另一个故事)。
这是我迄今为止尝试过的:
使用SparkSession.read
创建到 Postgres 的 Spark 会话
postgres = SparkSession.builder \
.appName("myApp") \
.config("spark.jars", "/usr/share/java/postgresql-jdbc.jar") \
.getOrCreate()
定义要传递给dbtable参数的查询
qry = """create table test (name varchar(50), age int)"""
将 qry 传递给 Postgres spark 会话对象的 dbtable 参数
postgres.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://....) \
.option("dbtable", qry) \
.option("user", configs['user']) \
.option("password", configs['password']) \
.option("driver", "org.postgresql.Driver") \
.option("ssl", "true") \
.load()
返回以下语法错误(使用上面列出的其他 SQL 命令时会出现相同类型的错误):
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 9, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o484.load.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "create"
Position: 15
使用SparkSession.sql()
利用上面定义的相同 postgres 对象
将查询传递给 .sql()
postgres.sql("""create table (name varchar(50), age int)""")
返回以下解析异常:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nno viable alternative at input 'create table ('(line 1, pos 13)\n\n== SQL ==\ncreate table (name varchar(50), age int)\n-------------^^^\n"
如果我像postgres.sql("(create table (name varchar(50), age int))") 那样将查询用引号括起来,那么我会得到一个不同的解析异常,这让我相信我想要的功能是不可能的:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 367, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5711943099029736374.py", line 360, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 714, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
ParseException: u"\nextraneous input 'create' expecting {'(', 'SELECT', 'FROM', 'VALUES', 'TABLE', 'INSERT', 'MAP', 'REDUCE'}(line 1, pos 1)\n\n== SQL ==\n(create table (name varchar(50), age int))\n-^^^\n"
我的问题归结为:
- 我的方法是否缺少某种配置或其他必要步骤?
-
spark.sql()API 能否以某种方式与 Postgres 一起使用? - 我正在努力实现的目标是否可能?
我在互联网上搜索过,试图找到使用 SparkSQL 向 PostgreSQL 发出此类 SQL 查询的示例,但没有找到任何解决方案。如果有解决方案,我会很高兴看到一个例子,否则确认这是不可能的就足够了。
【问题讨论】:
标签: python postgresql apache-spark jdbc pyspark