【发布时间】:2020-04-21 14:16:06
【问题描述】:
在获取外部数据库数据时,我需要弄清楚 spark 是如何工作的。 我从 spark 文档中了解到的是,如果我不提及诸如“numPartitons”、“lowerBound”和“upperBound”之类的属性,那么通过 jdbc 读取不是并行的。在这种情况下会发生什么? 数据是否由 1 个获取所有数据的特定执行程序读取?那么并行性是如何实现的呢? 那个 executor 以后会和其他 executor 共享数据吗?但我相信 executor 不能像这样共享数据。
如果你们中的任何人对此进行了探索,请告诉我。
编辑我的问题 - 嗨,阿米特,感谢您的回复,但这不是我想要的。让我详细说明:- 参考这个-https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
参考下面代码sn -p -
val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)
输出:
Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|212267|
| 3| 56714|
| 4|124824|
| 2|232193|
| 0|627712|
+--------------------+------+
这里我使用自定义函数 db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) 读取表,它指定根据字段 bu_id 创建 5 个分区,其下限值为 10 和上限值是9000。 查看 spark 如何在 5 个具有 5 个并行连接的分区中读取数据(如 spark doc 所述)。现在让我们在不提及上述任何参数的情况下阅读此表 -
我只是使用另一个函数获取数据 - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)
这里我只传递了 spark 会话(ss)、获取数据的查询(MultiJoin)和异常处理的另一个参数(bs)。 o/p 如下所示—— 获取开始
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 0|1253710|
查看数据是如何读入一个分区的,这意味着只产生 1 个连接。 问题仍然是这个分区将只在一台机器上,并且将分配一个任务。 所以这里没有并行性。那么数据如何分发给其他执行者呢?
顺便说一下,这是我在两种情况下都使用的 spark-submit 命令 -
spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar
【问题讨论】:
-
您已将执行程序配置为 1,这就是没有并行性的原因。我认为默认情况下它设置为 2。尝试删除您所做的此显式配置并检查。