【问题标题】:Spark Streaming using Scala to insert to Hbase IssueSpark Streaming 使用 Scala 插入 Hbase 问题
【发布时间】:2014-11-25 10:30:40
【问题描述】:

我正在尝试从 Kafka 消息中读取记录并放入 Hbase。虽然 scala 脚本运行没有任何问题,但插入没有发生。请帮帮我。

输入: 行键1,1 行键2,2

这是我正在使用的代码:

object Blaher {
  def blah(row: Array[String]) { 
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "test") 
    val thePut = new Put(Bytes.toBytes(row(0))) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(row(1))) 
    hTable.put(thePut) 
  } 
}


object TheMain extends Serializable{
  def run() {
    val ssc = new StreamingContext(sc, Seconds(1)) 
    val topicmap = Map("test" -> 1)
    val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap).map(_._2)
    val words = lines.map(line => line.split(",")).map(line => (line(0),line(1)))
    val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
    ssc.start()
  } 
}

TheMain.run()

【问题讨论】:

  • 在创建 SparkContext 时为 Spark 分配了多少个内核? (sc) ?
  • 看起来问题是将 rdd 转换为数组。 Somehome foreach rdd 对 Blaher.blah 方法的调用没有正确发生。无论如何将记录作为数组传递并将它们插入到 hbase 中?

标签: scala stream apache-spark


【解决方案1】:

来自HTableflushCommits() 方法的API 文档:“执行所有缓冲的Put 操作”。您应该在 blah() 方法的末尾调用它——看起来它们当前正在被缓冲,但从未在某个随机时间执行或执行。

【讨论】:

    猜你喜欢
    • 2016-02-28
    • 1970-01-01
    • 2017-12-20
    • 1970-01-01
    • 1970-01-01
    • 2016-05-04
    • 1970-01-01
    • 2018-01-30
    • 2018-08-05
    相关资源
    最近更新 更多