【问题标题】:HBASE SPARK Query with filter without load all the hbase带过滤器的 HBASE SPARK 查询不加载所有 hbase
【发布时间】:2026-01-22 02:05:02
【问题描述】:

我必须查询 HBASE,然后使用 spark 和 scala 处理数据。 我的问题是,使用我的解决方案,我获取了 HBASE 表的所有数据,然后进行过滤,这不是一种有效的方法,因为它占用了太多内存。所以我想直接做过滤器,我该怎么做?

def HbaseSparkQuery(table: String, gatewayINPUT: String, sparkContext: SparkContext): DataFrame = {

    val sqlContext = new SQLContext(sparkContext)

    import sqlContext.implicits._

    val conf = HBaseConfiguration.create()

    val tableName = table

    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("hbase.master", "localhost:60000")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])


    val DATAFRAME = hBaseRDD.map(x => {
      (Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
        Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("rssi"))))

    }).toDF()
      .withColumnRenamed("_1", "GatewayIMEA")
      .withColumnRenamed("_2", "EventTime")
      .withColumnRenamed("_3", "ap")
      .withColumnRenamed("_4", "RSSI")
      .filter($"GatewayIMEA" === gatewayINPUT)

    DATAFRAME
  }

正如你在我的代码中看到的那样,我在创建数据框之后进行过滤,在加载 Hbase 数据之后..

提前感谢您的回答

【问题讨论】:

    标签: scala apache-spark apache-spark-sql hbase


    【解决方案1】:

    这是我找到的解决方案

    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.filter._
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
    
    object HbaseConnector {
    
      def main(args: Array[String]): Unit = {
    
    //    System.setProperty("hadoop.home.dir", "/usr/local/hadoop")
        val sparkConf = new SparkConf().setAppName("CoverageAlgPipeline").setMaster("local[*]")
        val sparkContext = new SparkContext(sparkConf)
    
        val sqlContext = new SQLContext(sparkContext)
    
        import sqlContext.implicits._
    
        val spark = org.apache.spark.sql.SparkSession.builder
          .master("local")
          .appName("Coverage Algorithm")
          .getOrCreate
    
        val GatewayIMEA = "123"
    
        val TABLE_NAME = "TABLE"
    
        val conf = HBaseConfiguration.create()
    
        conf.set("hbase.zookeeper.quorum", "localhost")
        conf.set("hbase.master", "localhost:60000")
        conf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME)
    
        val connection = ConnectionFactory.createConnection(conf)
        val table = connection.getTable(TableName.valueOf(TABLE_NAME))
        val scan = new Scan
    
        val GatewayIDFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(String.valueOf(GatewayIMEA)))
        scan.setFilter(GatewayIDFilter)
    
        conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))
    
        val hBaseRDD = sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
    
        val DATAFRAME = hBaseRDD.map(x => {
          (Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"))),
            Bytes.toString(x._2.getValue(Bytes.toBytes("header"), Bytes.toBytes("eventTime"))),
            Bytes.toString(x._2.getValue(Bytes.toBytes("node"), Bytes.toBytes("imei"))),
            Bytes.toString(x._2.getValue(Bytes.toBytes("measure"), Bytes.toBytes("Measure"))))
    
        }).toDF()
          .withColumnRenamed("_1", "GatewayIMEA")
          .withColumnRenamed("_2", "EventTime")
          .withColumnRenamed("_3", "ap")
          .withColumnRenamed("_4", "measure")
    
    
        DATAFRAME.show()
    
      }
    
    }
    

    所做的是设置您的输入表,设置您的过滤器,使用过滤器进行扫描并将扫描到 RDD,然后将 RDD 转换为数据帧(可选)

    要做多个过滤器:

    val timestampFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("eventTime"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(String.valueOf(dateOfDayTimestamp)))
    val GatewayIDFilter = new SingleColumnValueFilter(Bytes.toBytes("header"), Bytes.toBytes("gatewayIMEA"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes(String.valueOf(GatewayIMEA)))
    
    val filters = new FilterList(GatewayIDFilter, timestampFilter)
    scan.setFilter(filters)
    

    【讨论】:

      【解决方案2】:

      您可以使用带有谓词下推的 spark-hbase 连接器。例如https://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase

      【讨论】: