【问题标题】:Spark Streaming: Using PairRDD.saveAsNewHadoopDataset function to save data to HBaseSpark Streaming:使用 PairRDD.saveAsNewHadoopDataset 函数将数据保存到 HBase
【发布时间】:2016-10-25 21:54:16
【问题描述】:

我想将 Twitter 流保存在 HBase 数据库中。我现在拥有的是用于接收和转换数据的 Sapk 应用程序。但是我不知道如何将我的 TwitterStream 保存到 HBase 中?

我发现唯一有用的是PairRDD.saveAsNewAPIHadoopDataset(conf) 方法。但是我应该如何使用它,我必须进行哪些配置才能将 RDD 数据保存到我的 HBase 表中?

我唯一发现的是 HBase 客户端库,它可以通过 Put 对象将数据插入到表中。但这不是 Spark 程序内部的解决方案,是吗(有必要遍历 RDD 中的所有项目!!)?

有人可以举个JAVA的例子吗?我的主要问题似乎是org.apache.hadoop.conf.Configuration 实例的设置,我必须在saveAsNewAPIHadoopDataset 中提交...

这里是代码sn-p:

JavaReceiverInputDStream<Status> statusDStream = TwitterUtils.createStream(streamingCtx);

JavaPairDStream<Long, String> statusPairDStream = statusDStream.mapToPair(new PairFunction<Status, Long, String>() {
    public Tuple2<Long, String> call(Status status) throws Exception {
        return new Tuple2<Long, String> (status.getId(), status.getText());
    }
});

statusPairDStream.foreachRDD(new Function<JavaPairRDD<Long,String>, Void>() {
    public Void call(JavaPairRDD<Long, String> status) throws Exception {
        org.apache.hadoop.conf.Configuration conf = new Configuration();
        status.saveAsNewAPIHadoopDataset(conf);
        // HBase PUT here can't be correct!?
        return null;
    }
});

【问题讨论】:

    标签: java hadoop apache-spark hbase spark-streaming


    【解决方案1】:

    First thing is functions are discouraged, if you are using java 8. Pls. use lambda.

    下面的代码 sn-p 可以解决您的所有查询。

    样本 sn-p:

    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    ....
    
    
      public static void processYourMessages(final JavaRDD<YourMessage> rdd, final HiveContext sqlContext, 
                        , MyMessageUtil messageutil) throws Exception {
            final JavaRDD<Row> yourrdd = rdd.filter(msg -> messageutil.filterType(.....) // create a java rdd
            final JavaPairRDD<ImmutableBytesWritable, Put> yourrddPuts = yourrdd.mapToPair(row -> messageutil.getPuts(row));
            yourrddPuts.saveAsNewAPIHadoopDataset(conf);       
        }
    

    conf 如下所示

    private Configuration conf =   HBaseConfiguration.create();
            conf.set(ZOOKEEPER_QUORUM, "comma seperated list of zookeeper quorum");
            conf.set("hbase.mapred.outputtable", "your table name");
            conf.set("mapreduce.outputformat.class", "org.apache.hadoop.hbase.mapreduce.TableOutputFormat");
    

    MyMessageUtilgetPuts 方法,如下所示

    public Tuple2<ImmutableBytesWritable, Put> getPuts(Row row) throws Exception {
    
            Put put = ..// prepare your put with all the columns you have.
            return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
        }
    

    希望这会有所帮助!

    【讨论】:

    • 感谢您的解决方案,看起来不错。我将在星期一尝试并提供反馈。谢谢!
    • 如果有任何问题请继续发布
    • 完美运行,谢谢!另一个例子可以在这里找到:sparkkb.wordpress.com/2015/05/04/…
    猜你喜欢
    • 2017-01-27
    • 2021-10-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-01
    • 2016-05-01
    • 2017-12-20
    • 2016-02-28
    相关资源
    最近更新 更多