【发布时间】:2018-07-25 22:11:37
【问题描述】:
我正在尝试使用 spark-jdbc 读取 postgres db 上的表。为此,我提出了以下代码:
object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/myuser/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "base.ledgers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
import spark.implicits._
val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
println("gpTable Count: " + rc)
}
}
现在,我正在获取行数只是为了查看连接是成功还是失败。这是一个巨大的表,它运行速度较慢以获取我理解的计数,因为没有为数据分区应该发生的分区号和列名提供参数。
在很多地方,我看到 jdbc 对象是通过以下方式创建的:
val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties)
我使用options 以另一种格式创建了它。
当使用“选项”形成 jdbc 连接时,我无法理解如何给出 numPartitions,我希望在其上对数据进行分区的分区列名称:val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
谁能告诉我
-
如何添加参数:
numPartitions, lowerBound, upperBound到这样写的jdbc对象:val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password", devPassword).load()
如何只添加
columnname和numPartition因为我想获取 年份的所有行:2017 年,我不想要一个范围 要选择的行数(lowerBound,upperBound)
【问题讨论】:
标签: apache-spark