【问题标题】:Map transformation performance spark dataframe vs RDD地图转换性能火花数据框与 RDD
【发布时间】:2016-08-24 14:29:02
【问题描述】:

我有一个四节点 hadoop 集群(mapr),每个集群有 40GB 内存。我需要在大数据集(5 亿行)的一个字段上“应用”一个函数。我的代码流程是我从 hive 表中读取数据作为 spark 数据帧,并将所需的函数应用于其中一列,如下所示:

schema = StructType([StructField("field1", IntegerType(), False), StructField("field2", StringType(), False),StructField("field3", FloatType(), False)])
udfCos = udf(lambda row: function_call(row), schema)
result = SparkDataFrame.withColumn("temp", udfCos(stringArgument))

类似的 RDD 版本可能如下所示:

result = sparkRDD.map(lambda row: function_call(row))

我想提高这段代码的性能,以确保代码以最大并行度和降低的吞吐量运行——我需要帮助来使用 spark 概念,例如“repartition”“SparkConf 中的并行值”或其他方法,在我的问题的背景下。任何帮助表示赞赏。

我的 spark 启动参数:

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 10 --driver-cores 10 --driver-memory 30g --executor-memory 7g --executor-cores 5 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="150"

【问题讨论】:

  • 对于初学者来说,不要使用 Python UDF。
  • 该建议背后有什么具体原因吗?我做了一个示例测试,导致 RDD map 和 udf 的运行时间相同(使用默认值)
  • 一般来说,这种往返 JVM -> Python -> JVM 昂贵且相对较慢,并且还有一些其他丑陋的属性(尤其是在 Spark
  • 我明白你的意思。但是除此之外,即使我使用 rdds,您能否帮助我使用重新分区之类的方法?有没有办法可以跨节点复制数据,如果可以,是否会降低吞吐量?

标签: hadoop apache-spark pyspark mapr


【解决方案1】:

要调整您的应用程序,您需要了解几件事

1) 您需要监控您的应用程序,无论您的集群是否未充分利用或您创建的应用程序使用了多少资源

可以使用各种工具进行监控,例如。 Ganglia 您可以从 Ganglia 中找到 CPU、内存和网络使用情况。

2) 根据对 CPU 和内存使用情况的观察,您可以更好地了解您的应用程序需要什么样的调优

形成你的火花点

在 spark-defaults.conf 中

即使您可以更改垃圾收集算法,您也可以指定应用程序需要多少驱动程序内存和执行程序内存。

以下是几个示例,您可以根据自己的要求调整此参数

spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.executor.extraJavaOptions  -XX:MaxPermSize=2G -XX:+UseG1GC
spark.driver.extraJavaOptions    -XX:MaxPermSize=6G -XX:+UseG1GC

更多详情请咨询http://spark.apache.org/docs/latest/tuning.html

【讨论】:

    猜你喜欢
    • 2017-03-09
    • 1970-01-01
    • 2017-02-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多