【问题标题】:Spark serialization errorSpark序列化错误
【发布时间】:2014-06-30 10:47:40
【问题描述】:

我正在尝试学习 spark + scala。我想从 HBase 读取,但没有 mapreduce。 我创建了一个简单的 HBase 表 - “测试”并在其中做了 3 次放置。我想通过 spark 阅读它(没有使用 mapreduce 的 HBaseTest)。我尝试在 shell 上运行以下命令

val numbers = Array(
  new Get(Bytes.toBytes("row1")), 
  new Get(Bytes.toBytes("row2")), 
  new Get(Bytes.toBytes("row3")))
val conf = new HBaseConfiguration()
val table = new HTable(conf, "test")
sc.parallelize(numbers, numbers.length).map(table.get).count()

我不断收到错误 - org.apache.spark.SparkException:作业中止:任务不可序列化:java.io.NotSerializableException:org.apache.hadoop.hbase.HBaseConfiguration

谁能帮帮我,我怎样才能创建一个使用可序列化配置的 Htable

谢谢

【问题讨论】:

    标签: hbase apache-spark


    【解决方案1】:

    当你这样做时会发生什么

    @transient val conf = new HBaseConfiguration

    UPDATE 显然,HBase 提交的任务的其他部分也是不可序列化的。这些都需要解决。

    • 考虑实体在线路的两侧是否具有相同的含义/语义。任何连接肯定不会。 HBaseConfiguration 应该被序列化。但是基元和构建在基元之上的简单对象(不包含上下文相关数据)可以包含在序列化中

    • 对于上下文相关的实体 - 包括 HBaseConfiguration 和任何面向连接的数据结构 - 您应该将它们标记为 @transient,然后在 readObject() 方法中使用与客户端环境相关的值对其进行实例化。

    【讨论】:

    • 你好,试过了。现在错误是 - org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Get 需要一些机制来使类可序列化跨度>
    • 嗯,它应该可以工作@javadba 你从哪里学到的?我读了这个examples.javacodegeeks.com/java-basics/exceptions/…,它说它应该可以工作(另见scala-lang.org/old/node/106
    • @user3529980 尝试在val table 上方添加@transient 并将new Get 移动到lambda 中。 Get 也不是可序列化的,因此您需要在 lamda 中创建 Get
    • @samthebest 感谢额外的 cmets。我已经更新了一些一般性指针的答案。
    【解决方案2】:

    您的问题是table 不可序列化(而是成员conf)并且您试图通过在map 中使用它来序列化它。他们尝试阅读 HBase 的方式并不完全正确,看起来您尝试了一些特定的 Get,然后尝试并行执行它们。即使您确实使这项工作正常进行,这也不会随着您执行随机读取而扩展。您要做的是使用 Spark 执行表扫描,这里有一个代码 sn-p 应该可以帮助您:

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    
    sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    

    这将为您提供一个包含构成行的 NaviagableMap 的 RDD。下面是如何将 NaviagbleMap 更改为普通的 Scala 字符串映射:

    ...
    .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
    .map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))
    
    def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
      navMap.asScala.toMap.map(cf =>
        (cf._1, cf._2.asScala.toMap.map(col =>
          (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))
    
    def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
      navMap.map(cf =>
        (Bytes.toString(cf._1), cf._2.map(col =>
          (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))
    

    最后一点,如果您真的想尝试并行执行随机读取,我相信您可以将 HBase 表初始化放在 map 中。

    【讨论】:

    • 您好,很抱歉回复延迟。我想尝试这种阅读方式,请您帮我以可序列化的方式定义类。我想尝试分发密钥并将其性能与标准方式进行比较。对此的任何想法将不胜感激。
    猜你喜欢
    • 2016-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-09-21
    • 2015-12-18
    • 2017-08-24
    • 2020-02-04
    • 2017-07-22
    相关资源
    最近更新 更多