【发布时间】:2015-09-08 03:25:16
【问题描述】:
本题的目的是记录:
在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
JDBC 源和已知解决方案可能存在的问题
只需稍加改动,这些方法就可以与其他支持的语言(包括 Scala 和 R)一起使用。
【问题讨论】:
标签: python scala apache-spark apache-spark-sql pyspark
本题的目的是记录:
在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
JDBC 源和已知解决方案可能存在的问题
只需稍加改动,这些方法就可以与其他支持的语言(包括 Scala 和 R)一起使用。
【问题讨论】:
标签: python scala apache-spark apache-spark-sql pyspark
在提交应用程序或启动 shell 时包含适用的 JDBC 驱动程序。例如,您可以使用--packages:
bin/pyspark --packages group:name:version
或者结合driver-class-path和jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
这些属性也可以在JVM实例启动前使用PYSPARK_SUBMIT_ARGS环境变量设置,或者使用conf/spark-defaults.conf设置spark.jars.packages或spark.jars/spark.driver.extraClassPath。
选择所需的模式。 Spark JDBC writer 支持以下模式:
append:将此 :class:DataFrame的内容附加到现有数据中。overwrite:覆盖现有数据。ignore:如果数据已经存在,则忽略此操作。error(默认情况):如果数据已经存在,则抛出异常。
Upserts 或其他细粒度修改are not supported
mode = ...
准备JDBC URI,例如:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
(可选)创建 JDBC 参数字典。
properties = {
"user": "foo",
"password": "bar"
}
properties/options也可以用来设置supported JDBC connection properties。
使用DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
保存数据(详见pyspark.sql.DataFrameWriter)。
已知问题:
使用--packages (java.sql.SQLException: No suitable driver found for jdbc: ...) 包含驱动程序时找不到合适的驱动程序
假设没有驱动程序版本不匹配来解决这个问题,您可以将driver 类添加到properties。例如:
properties = {
...
"driver": "org.postgresql.Driver"
}
使用df.write.format("jdbc").options(...).save() 可能会导致:
java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource 不允许创建表作为选择。
解决方案未知。
在 Pyspark 1.3 中你可以尝试直接调用 Java 方法:
df._jdf.insertIntoJDBC(url, "baz", True)
按照写入数据中的步骤 1-4
使用sqlContext.read.jdbc:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或sqlContext.read.format("jdbc"):
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
已知问题和陷阱:
找不到合适的驱动程序 - 请参阅:写入数据
Spark SQL 支持使用 JDBC 源进行谓词下推,但并非所有谓词都可以下推。它也不委托限制或聚合。可能的解决方法是将 dbtable / table 参数替换为有效的子查询。例如:
默认情况下,JDBC 数据源使用单个执行线程顺序加载数据。为确保分布式数据加载,您可以:
column(必须是IntegerType)、lowerBound、upperBound、numPartitions。predicates,每个所需分区一个。见:
在分布式模式下(使用分区列或谓词),每个执行程序都在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图一致。
Maven Repository(要获得--packages 所需的坐标,请选择所需的版本并以compile-group:name:version 的形式从Gradle 选项卡复制数据,替换相应的字段)或Maven Central Repository:
根据数据库,可能存在专门的源,并且在某些情况下是首选的:
【讨论】:
下载mysql-connector-java驱动并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
【讨论】:
参考此链接下载 jdbc for postgres 并按照步骤下载 jar 文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html jar 文件将在这样的路径中下载。 "/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
如果你的 spark 版本是 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
将文件另存为python并运行“python相应文件名.py”
【讨论】: