【问题标题】:Streaming to HBase with pyspark使用 pyspark 流式传输到 HBase
【发布时间】:2016-01-29 06:11:16
【问题描述】:

网上有大量关于使用 Scala 使用 Spark 流式传输批量加载到 HBase 的信息(thesetwo 特别有用)和一些关于 Java 的信息,但似乎缺乏相关信息使用 PySpark。所以我的问题是:

  • 如何使用 PySpark 将数据批量加载到 HBase?
  • 任何语言中的大多数示例仅显示每行插入一列。如何每行 upsert 多列?

我目前的代码如下:

if __name__ == "__main__":

    context = SparkContext(appName="PythonHBaseBulkLoader")
    streamingContext = StreamingContext(context, 5)

    stream = streamingContext.textFileStream("file:///test/input");

    stream.foreachRDD(bulk_load)

    streamingContext.start()
    streamingContext.awaitTermination()

我需要帮助的是批量加载功能

def bulk_load(rdd):
    #???

我之前已经取得了一些进展,但出现了许多不同的错误(如 herehere 所记录的)

【问题讨论】:

    标签: hadoop hbase pyspark spark-streaming


    【解决方案1】:

    因此,经过多次尝试和错误,我在这里展示了我想出的最好的东西。它运行良好,并且成功地批量加载数据(使用Puts 或 HFiles)我完全愿意相信这不是最好的方法,所以欢迎任何 cmets/其他答案。这假设您使用 CSV 存储数据。

    使用 Puts 批量加载

    到目前为止,批量加载最简单的方法是为 CSV 中的每个单元创建一个 Put 请求,并将它们排队到 HBase。

    def bulk_load(rdd):
        #Your configuration will likely be different. Insert your own quorum and parent node and table name
        conf = {"hbase.zookeeper.qourum": "localhost:2181",\
                "zookeeper.znode.parent": "/hbase-unsecure",\
                "hbase.mapred.outputtable": "Test",\
                "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
                "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
                "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
    
        keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
        valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    
        load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines
                      .flatMap(csv_to_key_value)#Convert the CSV line to key value pairs
        load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
    

    函数csv_to_key_value 是魔法发生的地方:

    def csv_to_key_value(row):
        cols = row.split(",")#Split on commas.
        #Each cell is a tuple of (key, [key, column-family, column-descriptor, value])
        #Works well for n>=1 columns
        result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
                  (cols[0], [cols[0], "f2", "c2", cols[2]]),
                  (cols[0], [cols[0], "f3", "c3", cols[3]]))
        return result
    

    我们之前定义的值转换器会将这些元组转换为 HBase Puts

    使用 HFile 批量加载

    使用 HFile 进行批量加载更有效:不是为每个单元格发送Put 请求,而是直接写入一个 HFile,然后简单地告诉 RegionServer 指向新的 HFile。这将使用 Py4J,因此在 Python 代码之前我们必须编写一个小型 Java 程序:

    import py4j.GatewayServer;
    import org.apache.hadoop.hbase.*;
    
    public class GatewayApplication {
    
        public static void main(String[] args)
        {
            GatewayApplication app = new GatewayApplication();
            GatewayServer server = new GatewayServer(app);
            server.start();
        }
    }
    

    编译并运行它。只要您的流媒体正在进行,就让它一直运行。现在更新bulk_load如下:

    def bulk_load(rdd):
        #The output class changes, everything else stays
        conf = {"hbase.zookeeper.qourum": "localhost:2181",\
                "zookeeper.znode.parent": "/hbase-unsecure",\
                "hbase.mapred.outputtable": "Test",\
                "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
                "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
                "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"}
    
        keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
        valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    
        load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                      .flatMap(csv_to_key_value)\
                      .sortByKey(True)
        #Don't process empty RDDs
        if not load_rdd.isEmpty():
            #saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile
            load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                            "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                            conf=conf,
                                            keyConverter=keyConv,
                                            valueConverter=valueConv)
            #The file has now been written, but HBase doesn't know about it
    
            #Get a link to Py4J
            gateway = JavaGateway()
            #Convert conf to a fully fledged Configuration type
            config = dict_to_conf(conf)
            #Set up our HTable
            htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test")
            #Set up our path
            path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime)
            #Get a bulk loader
            loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config)
            #Load the HFile
            loader.doBulkLoad(path, htable)
        else:
            print("Nothing to process")
    

    最后,相当简单的dict_to_conf

    def dict_to_conf(conf):
        gateway = JavaGateway()
        config = gateway.jvm.org.apache.hadoop.conf.Configuration()
        keys = conf.keys()
        vals = conf.values()
        for i in range(len(keys)):
            config.set(keys[i], vals[i])
        return config
    

    如您所见,使用 HFile 进行批量加载比使用 Puts 更复杂,但根据您的数据加载情况,这可能是值得的,因为一旦您开始使用它就不会那么困难了。

    最后一点让我措手不及:HFiles 期望他们收到的数据按词法顺序写入。这并不总是保证正确,特别是因为“10”

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)\
                  .sortByKey(True)#Sort in ascending order
    

    【讨论】:

    • 嗨@swinefish 我尝试了使用 hfiles 批量加载下的解决方案,但我收到此错误:- java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put 无法转换为 org .apache.hadoop.hbase.Cell
    猜你喜欢
    • 2017-05-26
    • 1970-01-01
    • 2014-10-28
    • 1970-01-01
    • 2016-06-04
    • 2021-07-28
    • 2018-11-18
    • 1970-01-01
    • 2016-02-16
    相关资源
    最近更新 更多