【问题标题】:Writing from Spark to HBase : org.apache.spark.SparkException: Task not serializable从 Spark 写入 HBase:org.apache.spark.SparkException: Task not serializable
【发布时间】:2018-07-08 10:46:28
【问题描述】:

我正在为我的大学做一个热图项目,我们必须从一个 txt 文件(坐标、高度)中获取一些数据 (212Go),然后将其放入 HBase 以使用 Express 在 Web 客户端上检索它。

我练习了使用 144Mo 文件,这是有效的:

SparkConf conf = new SparkConf().setAppName("PLE");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(args[0]);
Connection co = ConnectionFactory.createConnection(getConf());
createTable(co);
Table table = co.getTable(TableName.valueOf(TABLE_NAME));
Put put = new Put(Bytes.toBytes("KEY"));

for (String s : data.collect()) {
    String[] tmp = s.split(",");
    put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
}

table.put(put);

但是我现在使用 212Go 文件,出现了一些内存错误,我猜是 collect 方法收集了内存中的所有数据,所以 212Go 太多了。

所以现在我正在尝试这个:

SparkConf conf = new SparkConf().setAppName("PLE");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(args[0]);
Connection co = ConnectionFactory.createConnection(getConf());
createTable(co);
Table table = co.getTable(TableName.valueOf(TABLE_NAME));
Put put = new Put(Bytes.toBytes("KEY"));

data.foreach(line ->{
    String[] tmp = line.split(",");
    put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
});

table.put(put);

我得到“org.apache.spark.SparkException:任务不可序列化”,我搜索了它并尝试了一些修复,但没有成功,我在这里读到的内容:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

其实这个话题我不是什么都懂,我只是个学生,也许我的问题的答案是显而易见的,也许不是,无论如何提前谢谢!

【问题讨论】:

    标签: java apache-spark hbase


    【解决方案1】:

    根据经验,序列化数据库连接(任何类型)没有意义。没有设计成可序列化和反序列化的,Spark 与否。

    为每个分区创建连接:

    data.foreachPartition(partition -> {
      Connection co = ConnectionFactory.createConnection(getConf());
      ... // All required setup
      Table table = co.getTable(TableName.valueOf(TABLE_NAME));
      Put put = new Put(Bytes.toBytes("KEY"));
       while (partition.hasNext()) {
         String line = partition.next();
         String[] tmp = line.split(",");
         put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
       }
       ... // Clean connections
    });
    

    我还建议阅读官方 Spark Streaming 编程指南中的 Design Patterns for using foreachRDD

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-04
      • 2020-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-11-15
      • 2016-12-14
      • 2020-09-22
      相关资源
      最近更新 更多