【问题标题】:Spark 2 Dataframe Save to Hive - CompactionSpark 2 Dataframe 保存到 Hive - 压缩
【发布时间】:2018-01-09 03:23:20
【问题描述】:

我正在使用 spark session 将数据框保存到 hive 表。代码如下。

df.write.mode(SaveMode.Append).format("orc").insertInto("table")

数据来自kafka。这可能是一整天的大量数据。是否,spark dataframe 在内部保存是否会执行 hive 压缩?如果不是在不影响表插入的情况下定期进行压缩的最佳方法是什么。

【问题讨论】:

  • 你想对数据框做什么。请详细解释。
  • 想要保存到 ORC 格式的 hive 表中。这在上面的代码 sn-p 中显示

标签: apache-spark dataframe hive spark-dataframe


【解决方案1】:

在您的示例中,您应该添加 partitionBy,因为数据量可能很大

df.write..mode(SaveMode.Append).format("orc").partitionBy("age")

或者您也可以按以下方式归档 我这样做的方法是首先在 Spark 作业本身中注册一个临时表,然后利用 HiveContext 的 sql 方法使用临时表中的数据在 hive 中创建一个新表。例如,如果我有一个数据框 df 和 HiveContext hc,一般过程是:

df.registerTempTable("my_temp_table")
hc.sql("Insert into overwrite table_name PARTITION SELECT a,b, PARTITION_col  from my_temp_table")

【讨论】:

    【解决方案2】:

    公共类 HiveCompaction {

    private static SparkConf sparkConf;
    
    private static JavaSparkContext sc;
    
    private static SparkSession sqlContext = springutil.getBean("testSparkSession");
    
    private static HashMap<Object, Object> partitionColumns;
    
    public static void compact(String table, Dataset<Row> dataToCompact) {
    
        logger.info("Started Compaction for - " + table);
    
        if (!partitionColumns.containsKey(table)) {
            compact_table_without_partition(table, dataToCompact);
        } else {
            compact_table_with_partition(table, dataToCompact, partitionColumns);
        }
        logger.info("Data Overwritten in HIVE table : " + table + " successfully");
    
    }
    
    private static void compact_table_with_partition(String table, Dataset<Row> dataToCompact,
            Map<Object, Object> partitionData) {
    
        String[] partitions = ((String) partitionData.get(table)).split(",");
    
        List<Map<Object, Object>> partitionMap = getPartitionsToCompact(dataToCompact, Arrays.asList(partitions));
    
        for (Map mapper : partitionMap) {
        //  sqlContext.sql("REFRESH TABLE staging.dummy_table");
            String query = "select * from " + table + " where " + frameQuery(" and ", mapper);
            Dataset<Row> originalTable = sqlContext.sql(query.toString());
    
            if (originalTable.count() == 0) {
                dataToCompact.write().mode("append").format("parquet").insertInto(table);
            } else {
                String location = getHdfsFileLocation(table);
                String uuid = getUUID();
                updateTable(table, dataToCompact, originalTable, uuid);
                String destinationPath = framePath(location, frameQuery("/", mapper), uuid);
                sqlContext.sql("Alter table " + table + " partition(" + frameQuery(",", mapper) + ") set location '"
                        + destinationPath + "'");
            }
        }
    }
    
    private static void compact_table_without_partition(String table, Dataset<Row> dataToCompact) {
        String query = "select * from " + table;
        Dataset<Row> originalTable = sqlContext.sql(query.toString());
        if (originalTable.count() == 0) {
            dataToCompact.write().mode("append").format("parquet").insertInto(table);
        } else {
            String location = getHdfsFileLocation(table);
            String uuid = getUUID();
            String destinationPath = framePath(location, null, uuid);
            updateTable(table, dataToCompact, originalTable, uuid);
            sqlContext.sql("Alter table " + table + " set location '" + destinationPath + "'");
        }
    
    }
    
    private static void updateTable(String table, Dataset<Row> dataToCompact, Dataset<Row> originalTable, String uuid) {
        Seq<String> joinColumnSeq = getPrimaryKeyColumns();
        Dataset<Row> unModifiedRecords = originalTable.join(dataToCompact, joinColumnSeq, "leftanti");
        Dataset<Row> dataToInsert1 = dataToCompact.withColumn("uuid", functions.lit(uuid));
        Dataset<Row> dataToInsert2 = unModifiedRecords.withColumn("uuid", functions.lit(uuid));
        dataToInsert1.write().mode("append").format("parquet").insertInto(table + "_compacted");
        dataToInsert2.write().mode("append").format("parquet").insertInto(table + "_compacted");
    
    }
    
    private static String getHdfsFileLocation(String table) {
        Dataset<Row> tableDescription = sqlContext.sql("describe formatted " + table + "_compacted");
        List<Row> rows = tableDescription.collectAsList();
        String location = null;
        for (Row r : rows) {
            if (r.get(0).equals("Location")) {
                location = r.getString(1);
                break;
            }
        }
        return location;
    }
    
    private static String frameQuery(String delimiter, Map mapper) {
        StringBuilder modifiedQuery = new StringBuilder();
        int i = 1;
        for (Object key : mapper.keySet()) {
            modifiedQuery.append(key + "=");
            modifiedQuery.append(mapper.get(key));
            if (mapper.size() > i)
                modifiedQuery.append(delimiter);
            i++;
        }
        return modifiedQuery.toString();
    }
    
    private static String framePath(String location, String framedpartition, String uuid) {
        StringBuilder loc = new StringBuilder(location);
    
        loc.append("/");
        if (StringUtils.isNotEmpty(framedpartition)) {
            loc.append(framedpartition);
            loc.append("/");
        }
        loc.append("uuid=");
        loc.append(uuid);
        logger.info(loc.toString());
        return loc.toString();
    }
    
    public static Seq<String> getColumnSeq(List<String> joinColumns) {
        List<String> cols = new ArrayList<>(joinColumns.size());
        for (int i = 0; i < joinColumns.size(); i++) {
            cols.add(joinColumns.get(i).toLowerCase());
        }
        return JavaConverters.asScalaBufferConverter(cols).asScala().readOnly();
    }
    
    private static String getUUID() {
        StringBuilder uri = new StringBuilder();
        Random rand = new Random();
        int randNum = rand.nextInt(200);
        String uuid = DateTimeFormatter.ofPattern("yyyyMMddHHmmSSS").format(LocalDateTime.now()).toString()
                + (String.valueOf(randNum));
        return uuid;
    }
    
    private static List<Map<Object, Object>> getPartitionsToCompact(Dataset<Row> filteredRecords,
            List<String> partitions) {
        Column[] columns = new Column[partitions.size()];
        int index = 0;
        for (String c : partitions) {
            columns[index] = new Column(c);
            index++;
        }
        Dataset<Row> partitionsToCompact = filteredRecords.select(columns)
                .distinct(); /**
                                 * TOD : add filter condition for selecting
                                 * known paritions
                                 */
        JavaRDD<Map<Object, Object>> querywithPartitions = partitionsToCompact.toJavaRDD().map(row -> {
            return convertRowToMap(row);
        });
    
        return querywithPartitions.collect();
    }
    
    private static Map<Object, Object> convertRowToMap(Row row) {
        StructField[] fields = row.schema().fields();
        List<StructField> structFields = Arrays.asList(fields);
        Map<Object, Object> a = structFields.stream()
                .collect(Collectors.toMap(e -> ((StructField) e).name(), e -> row.getAs(e.name())));
        return a;
    }
    
    private static Seq<String> getPrimaryKeyColumns() {
        ArrayList<String> primaryKeyColumns = new ArrayList<String>();
        Seq<String> joinColumnSeq = getColumnSeq(primaryKeyColumns);
        return joinColumnSeq;
    }
    
    /*
     * public static void initSpark(String jobname) { sparkConf = new
     * SparkConf().setAppName(jobname); sparkConf.setMaster("local[3]");
     * sparkConf.set("spark.driver.allowMultipleContexts", "true"); sc = new
     * JavaSparkContext(); sqlContext = new SQLContext(sc); }
     */
    
    public static HashMap<Object, Object> getParitionColumns() {
        HashMap<Object, Object> paritionColumns = new HashMap<Object, Object>();
        paritionColumns.put((Object) "staging.dummy_table", "trade_date,dwh_business_date,region_cd");
        return paritionColumns;
    }
    
    public static void initialize(String table) {
        // initSpark("Hive Table Compaction -" + table);
        partitionColumns = getParitionColumns();
    
    }
    

    } 用法: 字符串表 = "staging.dummy_table";

        HiveCompaction.initialize(table);
        Dataset<Row> dataToCompact = sparkSession.sql("select * from staging.dummy_table");
        HiveCompaction.compact(table, dataToCompact);
    
        sparkSession.sql("select * from staging.dummy_table_compacted").show();
    
        System.out.println("Compaction successful");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-13
      • 2019-02-19
      • 2017-03-03
      • 1970-01-01
      • 2015-11-03
      • 2019-09-04
      • 2019-05-30
      • 2015-08-20
      相关资源
      最近更新 更多