【问题标题】:Spark Stream Kafka and Hbase ConfigSpark Stream Kafka 和 Hbase 配置
【发布时间】:2016-05-27 10:41:18
【问题描述】:

我对使用 Kafka 和 HBase 进行火花流处理有几个问题。 下面是我的 spark 流程序,这里我使用 zookeeper 配置连接到 Kafka 和 Hbase。 我们真的需要在流代码中进行这种配置吗?或者我做错了 如果我使用的是Hortonworks或Cloudera等hadoop发行版,应该有使用kafka和Hbase配置spark的规定,这样我的spark流代码应该只带kafka topic和Hbase table等参数,没有zoo keeper等配置。如果可以,请您帮我完成步骤。

object KafkaSparkStream{
  def main(args: Array[String]): Unit =
    {
      var arg = Array("10.74.163.163:9092,10.74.163.154:9092", "10.74.163.154:2181", "test_topic")
      val Array(broker, zk, topic) = arg
      val conf = new SparkConf()
        .setAppName("KafkaSparkStreamToHbase")
        .setMaster("local[2]");
      //.setMaster("yarn-client")
      val ssc = new StreamingContext(conf, Seconds(5))
      val kafkaConf = Map("metadata.broker.list" -> broker,
        "zookeeper.connect" -> zk,
        "group.id" -> "kafka-spark-streaming-example",
        "zookeeper.connection.timeout.ms" -> "1000")
      /* Kafka integration with reciever */
      val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
        ssc, kafkaConf, Map(topic -> 1),
        StorageLevel.MEMORY_ONLY_SER).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.foreachRDD(rdd => {
        val conf = HBaseConfiguration.create()
        conf.set(TableOutputFormat.OUTPUT_TABLE, "stream_count")
        conf.set("hbase.zookeeper.quorum", "10.74.163.154:2181")
        conf.set("hbase.master", "HOSTNAME:16000");
        conf.set("hbase.rootdir", "file:///tmp/hbase")
        val jobConf = new Configuration(conf)
        jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
        jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
        jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)
        //rdd.saveAsNewAPIHadoopDataset(jobConf)
        rdd.map(convert).saveAsNewAPIHadoopDataset(jobConf)
      })
      wordCounts.print()
      ssc.start()
      ssc.awaitTermination()
    }

【问题讨论】:

    标签: apache-spark hbase apache-kafka spark-streaming cloudera-cdh


    【解决方案1】:

    使用 HBase 的方法是将您的 hbase-site.xml 配置文件添加到 Spark 类路径。

    对于 kafka,您可以使用 https://github.com/typesafehub/config 从自定义配置文件中加载属性。 要使用此配置文件,您必须:

    • 设置--driver-class-path <dir with the config file>
    • 设置--files <configuration file>将该文件复制到每个执行器的工作目录
    • 设置spark.executor.extraClassPath=./ 将每个执行器的工作目录添加到其类路径中

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多