【问题标题】:Apache Spark driver memoryApache Spark 驱动程序内存
【发布时间】:2020-02-18 17:34:17
【问题描述】:

我一直在尝试在 windows 上的 intellij 中安装和运行一个简单的 Java Apache Spark,但我遇到了一个无法解决的错误。我已经通过 maven 安装了 spark。我收到此错误:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/03/20 23:53:23 INFO SparkContext: Running Spark version 2.0.0-cloudera1-SNAPSHOT
19/03/20 23:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/03/20 23:53:24 INFO SecurityManager: Changing view acls to: Drakker
19/03/20 23:53:24 INFO SecurityManager: Changing modify acls to: Drakker
19/03/20 23:53:24 INFO SecurityManager: Changing view acls groups to: 
19/03/20 23:53:24 INFO SecurityManager: Changing modify acls groups to: 
19/03/20 23:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Drakker); groups with view permissions: Set(); users  with modify permissions: Set(Drakker); groups with modify permissions: Set()
19/03/20 23:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 50007.
19/03/20 23:53:25 INFO SparkEnv: Registering MapOutputTracker
19/03/20 23:53:25 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:429)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at Spark.App.main(App.java:16)
19/03/20 23:53:25 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200. Please increase heap size using the --driver-memory option or spark.driver.memory in Spark configuration.
    at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:212)
    at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:194)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:165)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:260)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:429)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at Spark.App.main(App.java:16)

我尝试手动设置驱动程序内存,但没有成功。我也尝试在本地安装 spark,但从命令提示符更改驱动程序内存没有帮助。

这是代码:

package Spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

public class App 
{
    public static void main( String[] args )
    {
        SparkConf conf = new SparkConf().setAppName("Spark").setMaster("local");
//        conf.set("spark.driver.memory","471859200");
        JavaSparkContext sc = new JavaSparkContext(conf);


        List<Integer> data= Arrays.asList(1,2,3,4,5,6,7,8,9,1,2,3,4,5,6,7,8,9);
        JavaRDD<Integer> rdd=sc.parallelize(data);
        JavaRDD<Integer> list=rdd.map(s->s);
        int totalLines=list.reduce((a,b)->a+b);
        System.out.println(totalLines);
    }
}

实例化 JavaSparkContext 时出现错误。有谁知道如何解决这个问题?

谢谢!

【问题讨论】:

  • 欢迎来到 SO。看来您正在尝试开始使用 Spark 并且...看来您并没有真正走对路 :( 因为您似乎将一些早期的 Spark 代码与新的 Spark 代码混合在一起,Spark version 2.0.0-cloudera1-SNAPSHOT 看起来不太很好。我可以建议看看 Java 示例吗?非常自私,我会推荐 github.com/jgperrin/net.jgp.labs.sparkgithub.com/jgperrin/net.jgp.books.spark.ch01... 乐于助人

标签: java apache-spark


【解决方案1】:

如果你使用eclipse,可以设置Run > Run Configurations... > Arguments > VM arguments and set max heap size like -Xmx512m.

在idea中你可以设置Run\Debug Configurations>VM options : -Xmx512m

在你的代码中,你可以试试这个conf.set("spark.testing.memory", "2147480000")

【讨论】:

    【解决方案2】:

    我对您的代码有点困惑,因为它混合了像 SparkConf 这样的预 Spark 2.x 结构和大量 RDD。使用它们并没有错,但是从 Spark 2.x 开始,情况就有些不同了。

    这是一个使用 SparkSession 和数据帧的示例,它是 RDD 的一个超集、更强大的版本(简而言之)。

    在示例中,您将看到几种创建 map/reduce 操作的方法,其中两种使用 map/reduce,另一种使用类似 SQL 的简单语法。

    使用 getAs() 映射和减少

    int totalLines = df
        .map(
            (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"),
            Encoders.INT())
        .reduce((a, b) -> a + b);
    System.out.println(totalLines);
    

    使用 getInt() 映射和减少

    totalLines = df
        .map(
            (MapFunction<Row, Integer>) row -> row.getInt(0),
            Encoders.INT())
        .reduce((a, b) -> a + b);
    System.out.println(totalLines);
    

    类 SQL

    这可能是最受欢迎的。

    long totalLinesL = df.selectExpr("sum(*)").first().getLong(0);
    System.out.println(totalLinesL);
    

    完整示例

    package net.jgp.books.spark.ch07.lab990_others;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    /**
     * Simple ingestion followed by map and reduce operations.
     * 
     * @author jgp
     */
    public class SelfIngestionApp {
    
      /**
       * main() is your entry point to the application.
       * 
       * @param args
       */
      public static void main(String[] args) {
        SelfIngestionApp app = new SelfIngestionApp();
        app.start();
      }
    
      /**
       * The processing code.
       */
      private void start() {
        // Creates a session on a local master
        SparkSession spark = SparkSession.builder()
            .appName("Self ingestion")
            .master("local[*]")
            .getOrCreate();
    
        Dataset<Row> df = createDataframe(spark);
        df.show(false);
    
        // map and reduce with getAs()
        int totalLines = df
            .map(
                (MapFunction<Row, Integer>) row -> row.<Integer>getAs("i"),
                Encoders.INT())
            .reduce((a, b) -> a + b);
        System.out.println(totalLines);
    
        // map and reduce with getInt()
        totalLines = df
            .map(
                (MapFunction<Row, Integer>) row -> row.getInt(0),
                Encoders.INT())
            .reduce((a, b) -> a + b);
        System.out.println(totalLines);
    
        // SQL-like
        long totalLinesL = df.selectExpr("sum(*)").first().getLong(0);
        System.out.println(totalLinesL);
      }
    
      private static Dataset<Row> createDataframe(SparkSession spark) {
        StructType schema = DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField(
                "i",
                DataTypes.IntegerType,
                false) });
    
        List<Integer> data =
            Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        List<Row> rows = new ArrayList<>();
        for (int i : data) {
          rows.add(RowFactory.create(i));
        }
    
        return spark.createDataFrame(rows, schema);
      }
    }
    

    【讨论】:

      【解决方案3】:

      可以尝试使用Spark Session builder,可以通过spark.sparkContext() 获取spark context

      public static SparkSession sparkSession(String master,
                                              String appName) {
      return    SparkSession.builder().appName(appName)
                             .master(master)
                             .config("spark.dynamicAllocation.enabled", true)
                             .config("spark.shuffle.service.enabled", true)
                             .config("spark.driver.maxResultSize", "8g")
                             .config("spark.executor.memory", "8g")
                             .config("spark.executor.cores", "4")
                             .config("spark.cores.max", "6")
                             .config("spark.submit.deployMode", "client")
                             .config("spark.network.timeout", "3600s")
                             .config("spark.eventLog.enabled", true)
                             .getOrCreate();
      }
      

      【讨论】:

        【解决方案4】:

        Driver Memory Exception

        当 Spark 驱动程序内存不足时会发生这种情况。也就是启动驱动的应用主控超过限制并终止纱线进程。

        错误消息:Java.lang.OutOfMemoryError

        解决方案:通过以下设置增加驱动程序内存:

            conf spark.driver.memory = <XY>g
        

        【讨论】:

          猜你喜欢
          • 2017-05-29
          • 1970-01-01
          • 2015-11-24
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2015-10-08
          • 1970-01-01
          • 2019-08-11
          相关资源
          最近更新 更多