【问题标题】:Connect to Hive from Spark without using "hive-site.xml"不使用“hive-site.xml”从 Spark 连接到 Hive
【发布时间】:2019-06-09 08:24:29
【问题描述】:

有什么方法可以在不使用“hive-site.xml”的情况下从 Spark 连接到 Hive?

 SparkLauncher sl = new SparkLauncher(evnProps);
        sl.addSparkArg("--verbose");
        sl.addAppArgs(appArgs);
        sl.addFile(evnProps.get(KEY_YARN_CONF_DIR) + "/hive-site.xml");

我们正在将“hive-site.xml”传递给 SparkLauncher。我想删除对“hive-site.xml”的依赖在此处输入代码。

【问题讨论】:

    标签: java apache-spark hadoop hive


    【解决方案1】:

    Spark SQL 支持读取和写入存储在 Apache Hive 中的数据。但是,由于 Hive 具有大量依赖项,因此这些依赖项不包含在默认的 Spark 分发中。如果可以在类路径中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖项还必须存在于所有工作节点上,因为它们需要访问 Hive 序列化和反序列化库 (SerDes) 才能访问存储在 Hive 中的数据。

    通过将 hive-site.xml、core-site.xml(用于安全配置)和 hdfs-site.xml(用于 HDFS 配置)文件放在 conf/ 中来完成 Hive 的配置。

    使用 Hive 时,必须使用 Hive 支持实例化 SparkSession,包括与持久 Hive 元存储的连接、对 Hive serdes 的支持以及 Hive 用户定义的函数。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。 hive-site.xml未配置时,上下文自动在当前目录创建metastore_db,并创建spark.sql.warehouse.dir配置的目录,默认为Spark应用当前目录下的spark-warehouse目录已启动。请注意,自 Spark 2.0.0 以来,hive-site.xml 中的 hive.metastore.warehouse.dir 属性已被弃用。相反,使用 spark.sql.warehouse.dir 指定仓库中数据库的默认位置。您可能需要向启动 Spark 应用程序的用户授予写入权限。

    import java.io.File;
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.api.java.function.MapFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    public static class Record implements Serializable {
      private int key;
      private String value;
    
      public int getKey() {
        return key;
      }
    
      public void setKey(int key) {
        this.key = key;
      }
    
      public String getValue() {
        return value;
      }
    
      public void setValue(String value) {
        this.value = value;
      }
    }
    
    // warehouseLocation points to the default location for managed databases and tables
    String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate();
    
    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
    spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
    
    // Queries are expressed in HiveQL
    spark.sql("SELECT * FROM src").show();
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...
    
    // Aggregation queries are also supported.
    spark.sql("SELECT COUNT(*) FROM src").show();
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+
    
    // The results of SQL queries are themselves DataFrames and support all normal functions.
    Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
    
    // The items in DataFrames are of type Row, which lets you to access each column by ordinal.
    Dataset<String> stringsDS = sqlDF.map(
        (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
        Encoders.STRING());
    stringsDS.show();
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...
    
    // You can also use DataFrames to create temporary views within a SparkSession.
    List<Record> records = new ArrayList<>();
    for (int key = 1; key < 100; key++) {
      Record record = new Record();
      record.setKey(key);
      record.setValue("val_" + key);
      records.add(record);
    }
    Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
    recordsDF.createOrReplaceTempView("records");
    
    // Queries can then join DataFrames data with data stored in Hive.
    spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // ...
    

    【讨论】:

    • 谢谢。将尝试启用配置单元支持。
    • 它对您有帮助吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-05-06
    • 2014-02-12
    • 2016-08-22
    • 1970-01-01
    • 2019-11-02
    • 1970-01-01
    • 2017-09-29
    相关资源
    最近更新 更多