【问题标题】:How spark reads from jdbc and distribute the dataspark如何从jdbc读取数据并分发数据
【发布时间】:2020-04-21 14:16:06
【问题描述】:

在获取外部数据库数据时,我需要弄清楚 spark 是如何工作的。 我从 spark 文档中了解到的是,如果我不提及诸如“numPartitons”、“lowerBound”和“upperBound”之类的属性,那么通过 jdbc 读取不是并行的。在这种情况下会发生什么? 数据是否由 1 个获取所有数据的特定执行程序读取?那么并行性是如何实现的呢? 那个 executor 以后会和其他 executor 共享数据吗?但我相信 executor 不能像这样共享数据。

如果你们中的任何人对此进行了探索,请告诉我。

编辑我的问题 - 嗨,阿米特,感谢您的回复,但这不是我想要的。让我详细说明:- 参考这个-https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

参考下面代码sn -p -

val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)

输出:

Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|212267|
|                   3| 56714|
|                   4|124824|
|                   2|232193|
|                   0|627712|
+--------------------+------+

这里我使用自定义函数 db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) 读取表,它指定根据字段 bu_id 创建 5 个分区,其下限值为 10 和上限值是9000。 查看 spark 如何在 5 个具有 5 个并行连接的分区中读取数据(如 spark doc 所述)。现在让我们在不提及上述任何参数的情况下阅读此表 -

我只是使用另一个函数获取数据 - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)

这里我只传递了 spark 会话(ss)、获取数据的查询(MultiJoin)和异常处理的另一个参数(bs)。 o/p 如下所示—— 获取开始

== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0|1253710|

查看数据是如何读入一个分区的,这意味着只产生 1 个连接。 问题仍然是这个分区将只在一台机器上,并且将分配一个任务。 所以这里没有并行性。那么数据如何分发给其他执行者呢?

顺便说一下,这是我在两种情况下都使用的 spark-submit 命令 -

spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar

【问题讨论】:

  • 您已将执行程序配置为 1,这就是没有并行性的原因。我认为默认情况下它设置为 2。尝试删除您所做的此显式配置并检查。

标签: apache-spark spark-jdbc


【解决方案1】:

Re:"从外部数据库获取数据" 在您的 spark 应用程序中,这通常是将在执行程序上执行的代码部分。可以通过传递火花配置“num-executors”来控制执行器的数量。如果您使用过 Spark 和 RDD/Dataframe,那么连接到数据库的示例之一是转换函数,例如 map、flatmap、filter 等。这些函数在执行器上执行时(由 num-executors 配置) 将建立数据库连接并使用它。

这里要注意的重要一点是,如果您使用太多的执行程序,那么您的数据库服务器可能会变得越来越慢,最终没有响应。如果你给的执行者太少,那么它可能会导致你的火花工作需要更多的时间来完成。因此,您必须根据您的数据库服务器容量找到最佳数量。

Re:“那么并行性是如何实现的?那个执行器稍后会与其他执行器共享数据吗?”

上面提到的并行性是通过配置执行器的数量来实现的。配置执行器的数量只是增加并行度的一种方式,并不是唯一的方式。考虑这样一种情况,即您的数据较小,导致分区较少,那么您将看到较少的并行度。所以你需要有足够数量的分区(那些对应于任务),然后适当的(确定的数量取决于用例)执行器数量来并行执行这些任务。只要您可以单独处理每条记录,它就会扩展,但是一旦您有一个会导致随机播放的操作,您就会看到有关正在执行的任务和执行器的统计信息。 Spark 将尝试以最佳方式分发数据,以便它可以在最佳水平上工作。

请参考https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-1/及后续部分了解更多内部结构。

【讨论】:

  • 嗨阿米特,我已经编辑了我的问题并提供了更多详细信息。希望它能解释我到底想问什么。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-05-05
  • 1970-01-01
  • 2018-07-12
  • 1970-01-01
  • 2016-06-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多