【问题标题】:How to use JDBC source to write and read data in (Py)Spark?如何使用 JDBC 源在 (Pyspark?
【发布时间】:2015-09-08 03:25:16
【问题描述】:

本题的目的是记录:

  • 在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤

  • JDBC 源和已知解决方案可能存在的问题

只需稍加改动,这些方法就可以与其他支持的语言(包括 Scala 和 R)一起使用。

【问题讨论】:

    标签: python scala apache-spark apache-spark-sql pyspark


    【解决方案1】:

    写入数据

    1. 在提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。例如,您可以使用--packages:

       bin/pyspark --packages group:name:version  
      

    或者结合driver-class-pathjars

        bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    这些属性也可以在JVM实例启动前使用PYSPARK_SUBMIT_ARGS环境变量设置,或者使用conf/spark-defaults.conf设置spark.jars.packagesspark.jars/spark.driver.extraClassPath

    1. 选择所需的模式。 Spark JDBC writer 支持以下模式:

      • append:将此 :class:DataFrame 的内容附加到现有数据中。
      • overwrite:覆盖现有数据。
      • ignore:如果数据已经存在,则忽略此操作。
      • error(默认情况):如果数据已经存在,则抛出异常。

      Upserts 或其他细粒度修改are not supported

       mode = ...
      
    2. 准备JDBC URI,例如:

       # You can encode credentials in URI or pass
       # separately using properties argument
       # of jdbc method or options
      
       url = "jdbc:postgresql://localhost/foobar"
      
    3. (可选)创建 JDBC 参数字典。

       properties = {
           "user": "foo",
           "password": "bar"
       }
      

      properties/options也可以用来设置supported JDBC connection properties

    4. 使用DataFrame.write.jdbc

       df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
      

    保存数据(详见pyspark.sql.DataFrameWriter)。

    已知问题

    • 使用--packages (java.sql.SQLException: No suitable driver found for jdbc: ...) 包含驱动程序时找不到合适的驱动程序

      假设没有驱动程序版本不匹配来解决这个问题,您可以将driver 类添加到properties。例如:

        properties = {
            ...
            "driver": "org.postgresql.Driver"
        }
      
    • 使用df.write.format("jdbc").options(...).save() 可能会导致:

      java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。

      解决方案未知。

    • 在 Pyspark 1.3 中你可以尝试直接调用 Java 方法:

        df._jdf.insertIntoJDBC(url, "baz", True)
      

    读取数据

    1. 按照写入数据中的步骤 1-4

    2. 使用sqlContext.read.jdbc

       sqlContext.read.jdbc(url=url, table="baz", properties=properties)
      

    sqlContext.read.format("jdbc"):

        (sqlContext.read.format("jdbc")
            .options(url=url, dbtable="baz", **properties)
            .load())
    

    已知问题和陷阱

    在哪里可以找到合适的驱动程序:

    其他选项

    根据数据库,可能存在专门的源,并且在某些情况下是首选的:

    【讨论】:

    • mode="overwrite" 使用这个命令:spark_submit --driver-class-path /xx/yy/postgresql-xx.jar my-script.py
    【解决方案2】:

    下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构

        spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
    
        sc = spark.sparkContext
    
        from pyspark.sql import SQLContext
    
        sqlContext = SQLContext(sc)
    
        df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
    
        mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
    
        df.write.jdbc(mysql_url,table="actor1",mode="append")
    

    【讨论】:

      【解决方案3】:

      参考此链接下载 jdbc for postgres 并按照步骤下载 jar 文件

      https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar 文件将在这样的路径中下载。 "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"

      如果你的 spark 版本是 2

      from pyspark.sql import SparkSession
      
      spark = SparkSession.builder
              .appName("sparkanalysis")
              .config("spark.driver.extraClassPath",
               "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
              .getOrCreate()
      
      //for localhost database//
      
      pgDF = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql:postgres") \
      .option("dbtable", "public.user_emp_tab") \
      .option("user", "postgres") \
      .option("password", "Jonsnow@100") \
      .load()
      
      
      print(pgDF)
      
      pgDF.filter(pgDF["user_id"]>5).show()
      

      将文件另存为python并运行“python相应文件名.py”

      【讨论】:

        猜你喜欢
        • 2016-07-19
        • 1970-01-01
        • 1970-01-01
        • 2018-06-29
        • 2019-11-10
        • 2021-06-01
        相关资源
        最近更新 更多