【问题标题】:PHOENIX SPARK - Load Table as DataFramePHOENIX SPARK - 将表加载为 DataFrame
【发布时间】:2016-09-14 08:43:58
【问题描述】:

我从一个有 5 亿行的 HBase 表 (PHOENIX) 创建了一个 DataFrame。我从 DataFrame 创建了一个 JavaBean 的 RDD,并使用它来连接文件中的数据。

Map<String, String> phoenixInfoMap = new HashMap<String, String>();
phoenixInfoMap.put("table", tableName);
phoenixInfoMap.put("zkUrl", zkURL);
DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load();
JavaRDD<Row> tableRows = df.toJavaRDD();
JavaPairRDD<String, AccountModel> dbData = tableRows.mapToPair(
new PairFunction<Row, String, String>()
{
    @Override
    public Tuple2<String, String> call(Row row) throws Exception
    {
        return new Tuple2<String, String>(row.getAs("ID"), row.getAs("NAME"));
    }
});

现在我的问题 - 假设该文件有 2 百万个与表匹配的唯一条目。是整个表作为 RDD 加载到内存中,还是只有表中匹配的 200 万条记录作为 RDD 加载到内存中?

【问题讨论】:

  • 嗨@Mohan,请告诉我DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap).load() 方法的构建依赖关系。我也在做同样的事情,但得到java.lang.NoSuchMethodError

标签: apache-spark dataframe phoenix


【解决方案1】:

你的陈述

DataFrame df = sqlContext.read().format("org.apache.phoenix.spark").options(phoenixInfoMap)
.load();

会将整个表加载到内存中。您没有为 phoenix 提供任何过滤器以将其下推到 hbase - 从而减少读取的行数。

如果您对非 HBase 数据源(例如平面文件)进行连接,则首先需要读入 hbase 表中的所有记录。与辅助数据源不匹配的记录将不会保存在新的 DataFrame - 但最初的读数仍然会发生。

更新一种可能的方法是预处理文件 - 即提取您想要的 id。将结果存储到新的 HBase 表中。然后通过 Phoenix not Spark 直接在 HBase 中执行 join。

这种方法的基本原理是将计算转移到数据中。大部分数据驻留在 HBase 中 - 然后将小数据(文件中的 id)移动到那里。

我对 Phoenix 并不直接熟悉,只是它在 hbase 之上提供了一个 sql 层。大概那么它将能够进行这样的连接并将结果存储在单独的 HBase 表中..?然后可以将该单独的表加载到 Spark 中以用于您的后续计算。

【讨论】:

  • 谢谢@javadba。有没有一种有效的方法来处理这种情况?我只想从 HBase 表中加载文件中的 200 万个匹配条目。
猜你喜欢
  • 2016-09-06
  • 1970-01-01
  • 1970-01-01
  • 2015-06-24
  • 2016-07-15
  • 2018-03-10
  • 2019-02-16
相关资源
最近更新 更多