【发布时间】:2014-11-25 10:30:40
【问题描述】:
我正在尝试从 Kafka 消息中读取记录并放入 Hbase。虽然 scala 脚本运行没有任何问题,但插入没有发生。请帮帮我。
输入: 行键1,1 行键2,2
这是我正在使用的代码:
object Blaher {
def blah(row: Array[String]) {
val hConf = new HBaseConfiguration()
val hTable = new HTable(hConf, "test")
val thePut = new Put(Bytes.toBytes(row(0)))
thePut.add(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(row(1)))
hTable.put(thePut)
}
}
object TheMain extends Serializable{
def run() {
val ssc = new StreamingContext(sc, Seconds(1))
val topicmap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc,"127.0.0.1:2181", "test-consumer-group",topicmap).map(_._2)
val words = lines.map(line => line.split(",")).map(line => (line(0),line(1)))
val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah))
ssc.start()
}
}
TheMain.run()
【问题讨论】:
-
在创建 SparkContext 时为 Spark 分配了多少个内核? (sc) ?
-
看起来问题是将 rdd 转换为数组。 Somehome foreach rdd 对 Blaher.blah 方法的调用没有正确发生。无论如何将记录作为数组传递并将它们插入到 hbase 中?
标签: scala stream apache-spark