【问题标题】:pyspark logical conjunction for vertica sql用于vertica sql的pyspark逻辑连接
【发布时间】:2019-02-13 19:48:20
【问题描述】:

spark1.6,从我的 Vertica 数据库中检索数据以对其进行处理 以下查询在 vertica db 上运行良好,但在 pyspark 上不起作用,Spark DataFrames 支持使用 JDBC 源进行谓词下推,但术语谓词是用于严格的 SQL 含义。这意味着它仅涵盖 WHERE 子句。此外,它看起来仅限于逻辑连接(恐怕没有 IN 和 OR)和简单的谓词,它显示了这个错误:java.lang.RuntimeException: Option 'dbtable' not specified

conf = (SparkConf()
.setAppName("hivereader")
.setMaster("yarn-client")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.io.compression.codec", "snappy")
.set("spark.rdd.compress", "true")
.set("spark.executor.cores" , 7)
.set("spark.sql.inMemoryStorage.compressed", "true")
.set("spark.sql.shuffle.partitions" , 2000)
.set("spark.sql.tungsten.enabled" , 'true')
.set("spark.port.maxRetries" , 200))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

url = "*******"
properties = {"user": "*****", "password": "*******", "driver": "com.vertica.jdbc.Driver" }

df = sqlContext.read.format("JDBC").options(
    url = url,
    query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
    **properties
).load()

df.show()

【问题讨论】:

    标签: sql apache-spark pyspark vertica


    【解决方案1】:

    问题在于,即使您说此查询适用于 Vertica,您的查询也不是以 Vertica 可以识别的 SQL 语法编写的。您的查询应重写为:

    SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION
    FROM traffic.stats
    WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'
    

    修复所有这些错误后,您的 sqlContext.read 方法应如下所示。

    df = sqlContext.read.format("JDBC").options(
        url = url,
        query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29'",
        **properties
    ).load()
    
    df.show()
    

    或者您可以将表别名为子查询并使用dbtable 而不是query

    df = sqlContext.read.format("JDBC").options(
        url = url,
        dbtable = "(SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-25' AND '2019-01-29') temp",
        **properties
    ).load()
    
    df.show()
    

    【讨论】:

    • 谢谢,执行后出现下面的错误“:java.lang.RuntimeException: Option 'dbtable' not specified”
    • @MahmoudOdeh 你运行的是什么版本的 Spark 和 Python?我的测试是在 Spark 2.4.0 和 Python 2.7.10 上进行的。
    • 这个错误有点令人困惑。 querydbtable 是互斥的,如果您在options 中使用query,则不应抱怨缺少dbtable。您能否发布您使用的导致此错误的确切代码?
    • 我试过query = "SELECT date(time_stamp) AS DATE, (subscriber) AS IMSI, (server_hostname) AS WEBSITE, (bytes_in) AS DOWNLINK, (bytes_out) AS UPLINK,(connections_out) AS CONNECTION FROM traffic.stats WHERE DATE(time_stamp) between '2019-01-27' AND '2019-01-29' limit 10000000" df = sqlContext.read.format("JDBC").options( url = url, dbtable="( " + query + " ) as temp", **properties ).load() df.show()
    • pyspark 1.6、python 3.4 以及他们在文档中说的:应该读取的 JDBC 表。请注意,可以使用在 SQL 查询的 FROM 子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表。
    猜你喜欢
    • 2010-12-23
    • 1970-01-01
    • 1970-01-01
    • 2021-03-09
    • 1970-01-01
    • 2019-01-02
    • 2021-04-25
    • 1970-01-01
    • 2022-07-02
    相关资源
    最近更新 更多