【发布时间】:2016-06-19 15:32:10
【问题描述】:
我在 Spark / Scala REPL 上尝试一个简单的代码并得到这个错误。如何解决这个问题。我想使用 RDD.saveAsNewAPIHadoopDataset(conf) 将 RDD 保存到 HBase
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.OutputFormat
import org.apache.hadoop.hbase.client.Mutation
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.HashPartitioner
val tableName = "test"
val cfIndex = "cf".getBytes()
val colIndexId = "c1".getBytes()
val RDD = sc.parallelize(List(("1","2"),("1","2"),("1","3"), ("3","3")),2).repartition(2).mapPartitions { part =>
val tableName = "test"
val cfIndex = "cf".getBytes()
val colIndexId = "c01".getBytes()
part.map { case(k,v) =>
val put = new Put(k.getBytes())
put.add(cfIndex, colIndexId, v.getBytes())
(k, put)
}
}
错误TaskSetManager:阶段5.0(TID 17)中的任务0.0具有不可序列化的结果:org.apache.hadoop.hbase.client.Put
【问题讨论】:
标签: scala serialization apache-spark