【问题标题】:Spark Streaming - Issue with Passing parametersSpark Streaming - 传递参数的问题
【发布时间】:2015-03-19 05:08:01
【问题描述】:

请看下面用scala编写的spark流代码:

object HBase {
  var hbaseTable = ""
  val hConf = new HBaseConfiguration()
  hConf.set("hbase.zookeeper.quorum", "zookeeperhost")

  def init(input: (String)) {
    hbaseTable = input
  }
  def display() {
    print(hbaseTable)
  }
  def insertHbase(row: (String)) {
    val hTable = new HTable(hConf,hbaseTable)
  }
}

object mainHbase {
  def main(args : Array[String]) {
    if (args.length < 5) {
      System.err.println("Usage: MetricAggregatorHBase <zkQuorum> <group> <topics> <numThreads> <hbaseTable>")
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads, hbaseTable) = args
    HBase.init(hbaseTable)
    HBase.display()
    val sparkConf = new SparkConf().setAppName("mainHbase")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    ssc.checkpoint("checkpoint")
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    val storeStg = lines.foreachRDD(rdd => rdd.foreach(HBase.insertHbase))
    lines.print()
    ssc.start()
  }
}

我正在尝试通过调用HBase.init 方法来初始化对象HBase 中的参数hbaseTable。它正在正确设置参数。我通过在下一行调用HBase.display 方法确认了这一点。

但是,当调用foreachRDD 中的HBase.insertHbase 方法时,它的抛出错误hbaseTable 没有设置。

异常更新:

java.lang.IllegalArgumentException: Table qualifier must not be empty
        org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:179)
        org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:149)
        org.apache.hadoop.hbase.TableName.<init>(TableName.java:303)
        org.apache.hadoop.hbase.TableName.createTableNameIfNecessary(TableName.java:339)
        org.apache.hadoop.hbase.TableName.valueOf(TableName.java:426)
        org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:156)

请告诉我如何使这段代码工作。

【问题讨论】:

  • 请用引发的确切异常更新您的问题。
  • @lambdas 更新并抛出异常。

标签: scala apache-spark spark-streaming


【解决方案1】:

“这段代码在哪里运行”——这是我们需要问的问题,以便了解发生了什么。

HBase 是一个 Scala 对象 - 根据定义,它是一个单例构造,在 JVM 中使用“仅一次”语义进行初始化。

在初始化点,HBase.init(hbaseTable) 在此 Spark 应用程序的驱动程序中执行,用驱动程序的 VM 中的给定值初始化此对象。

但是当我们这样做时:rdd.foreach(HBase.insertHbase),闭包作为任务在每个为给定 RDD 托管分区的执行器上执行。此时,对象HBase 在每个VM 上为每个执行程序初始化。正如我们所见,此时此对象上没有发生任何初始化。

有两种选择:

我们可以向HBase 对象添加一些检查“isInitialized”,并添加-now 条件调用以在每次调用foreach 时进行初始化。 另一种选择是使用

rdd.foreachPartitition{partition => 
     HBase.initialize(...)
     partition.foreach(elem => HBase.insert(elem))
}

此构造将按每个分区中元素的数量摊销任何初始化。也可以将它与初始化检查结合起来,以防止不必要的引导工作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-02
    • 1970-01-01
    • 1970-01-01
    • 2017-04-19
    • 2017-04-20
    • 2017-06-22
    • 2015-03-10
    相关资源
    最近更新 更多