【问题标题】:Spark: HBase Bulk Load using ScalaSpark:使用 Scala 的 HBase 批量加载
【发布时间】:2018-06-26 08:08:38
【问题描述】:

我们有一个包含 100K 记录的文本文件,我们需要逐行读取文件并将其值插入 hbase。 该文件是“|”分隔。

文本文件示例:

    SLNO|Name|City|Pincode
    1|ABC|Pune|400104
    2|BMN|Delhi|100065

每一列都有不同的列族。 我们正在尝试使用 HBase 批量加载在 Spark-Scala 中实现这一点。 我们遇到了这个建议批量加载的链接: http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html

使用以下语法插入单列族。

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), 
"c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], 
classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())   

任何人都可以就多列系列的批量加载插入提供建议。

【问题讨论】:

    标签: scala hadoop apache-spark hbase


    【解决方案1】:

    请查看rdd.saveAsNewAPIHadoopDataset,将数据插入到 hbase 表中。

    def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
        import spark.implicits._
    
        val config = HBaseConfiguration.create()
        config.set("hbase.zookeeper.quorum", "ip's")
        config.set("hbase.zookeeper.property.clientPort","2181")
        config.set(TableInputFormat.INPUT_TABLE, "tableName")
    
        val newAPIJobConfiguration1 = Job.getInstance(config)
        newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
        newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val df: DataFrame  = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")
    
        val hbasePuts= df.rdd.map((row: Row) => {
          val  put = new Put(Bytes.toBytes(row.getString(0)))
          put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
          put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
          (new ImmutableBytesWritable(), put)
        })
    
        hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
        }
    

    参考:https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/

    【讨论】:

      猜你喜欢
      • 2016-05-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-12
      相关资源
      最近更新 更多