【问题标题】:Is this a bug of spark stream or memory leak?这是火花流的错误还是内存泄漏?
【发布时间】:2016-05-11 03:14:30
【问题描述】:

我将代码提交到 spark 独立集群。提交命令如下:

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./myCode.py 1>a.log 2>b.log &

我在上面的命令中指定执行器使用 4G 内存。但是使用 top 命令监控 executor 进程,我注意到内存使用量一直在增长。现在顶部的命令输出如下:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                    
12578 root      20   0 20.223g 5.790g  23856 S  61.5 37.3  20:49.36 java       

我的总内存是 16G,所以 37.3% 已经比我指定的 4GB 大了。而且还在增长。

使用ps命令,可以知道是executor进程。

[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?        Sl     1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ?        Sl     6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ?        Sl    10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ?        Sl    21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@10.79.148.184:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://Worker@10.79.148.184:52660

下面是代码。这很简单,所以我认为没有内存泄漏

if __name__ == "__main__":

    dataDirectory = '/stream/raw'

    sc = SparkContext(appName="Netflow")
    ssc = StreamingContext(sc, 20)

    # Read CSV File
    lines = ssc.textFileStream(dataDirectory)

    lines.foreachRDD(process)

    ssc.start()
    ssc.awaitTermination()

处理函数的代码如下。请注意,我在这里使用的是 HiveContext 而不是 SqlContext。因为SqlContext不支持窗口功能

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

def process(time, rdd):

    if rdd.isEmpty():
        return sc.emptyRDD()

    sqlContext = getSqlContextInstance(rdd.context)

    # Convert CSV File to Dataframe
    parts = rdd.map(lambda l: l.split(","))
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
    dataframe = sqlContext.createDataFrame(rowRdd)

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")

    ret.show()
    dataframe.show()

其实我发现下面的代码会导致问题:

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
    ret.show()

因为如果我删除这 5 行。该代码可以运行一整夜而不会显示内存增加。但是添加它们会导致executor的内存使用量增长到非常高。

基本上,上面的代码只是 SparkSQL 中的一些窗口 + grouby。那么这是一个错误吗?

【问题讨论】:

标签: memory apache-spark memory-leaks apache-spark-sql


【解决方案1】:

免责声明:此答案不是基于调试,而是更多地基于观察和 Apache Spark 提供的文档

我不认为这是一个错误!

查看您的配置,我们可以看到您主要关注执行器调优,这没有错,但您忘记了等式中的驱动程序部分。

Apache Spark documentaion看火花集群概览

如您所见,每个工作人员都有一个执行程序,但是,在您的情况下,工作程序节点与驱动程序节点相同!坦率地说,当您在本地运行或在单个节点的独立集群上运行时就是这种情况。

此外,驱动程序默认占用 1G 内存,除非使用 spark.driver.memory 标志进行调整。此外,您不应忘记 JVM 本身的堆使用情况,以及由驱动程序处理的 Web UI AFAIK!

当您删除您提到的代码行时,您的代码将没有 actions,因为 map 函数只是一个转换,因此不会执行,因此,您不要'根本看不到内存增加!

同样适用于groupBy,因为它只是一个不会执行的转换,除非正在调用一个动作,在你的情况下是aggshow 更下游!

也就是说,如果您想控制此进程的核心数量,请尽量减少驱动程序内存和由 spark.cores.max 定义的 spark 中的核心总数,然后级联到执行程序。此外,我会将spark.python.profile.dump 添加到您的配置列表中,这样您就可以看到您的 Spark 作业执行的配置文件,这可以帮助您更多地了解案例,并根据您的需要调整您的集群。

【讨论】:

  • 您好,感谢您的回答。但是 1. 删除这些行后我仍然有一个 dataframe.show() 。所以还是有动作的。 2. 就我而言,流计算可以运行几个小时。这意味着数千个循环(间隔为 20 秒)。在此期间,执行程序的内存使用量不断增长。所以我不知道你建议的解决方案是什么。最小化我的驱动程序内存?为什么?
  • 我没有意识到该节目不是已删除代码的一部分。至于内存,我是在尝试最小化它的基础上构建的,看看它是否会溢出!
【解决方案2】:

正如我在您的 5 行中看到的那样,groupBy 可能是问题所在,您是否可以尝试使用reduceBy,看看它的表现如何。

请参阅 herehere

【讨论】:

  • 感谢您的信息。但我希望知道这是一个错误还是我没有以正确的方式使用它。
  • @Tristan 和 RDD 上的 groupBy 不一样,见stackoverflow.com/q/32902982/1560062
  • 我假设这个 csv 文件存储在 HDFS 上。它的大小是多少?它增长/变化多少以及频率如何。我想了解的是,您需要在每个批处理间隔处理多少数据,该间隔是多少(默认为 1 秒)?
  • @z-star 是的。 CSV 文件位于 hdfs 上。大小为几 KB(小于 1MB)。批处理间隔为 20 秒。每个区间下的数据大小不会改变。我没有看到总延迟时间或处理时间增加,所以我认为工作量不是问题
猜你喜欢
  • 2012-07-12
  • 1970-01-01
  • 2011-10-29
  • 1970-01-01
  • 2018-03-04
  • 2011-02-20
  • 2013-08-12
相关资源
最近更新 更多