【问题标题】:Write the result of SQL Query to file by Apache Flink通过 Apache Flink 将 SQL Query 的结果写入文件
【发布时间】:2020-09-07 15:05:12
【问题描述】:

我有以下任务:

  1. 使用对 Hive 表的 SQL 请求创建作业;
  2. 在远程 Flink 集群上运行此作业;
  3. 在文件中收集此作业的结果(最好使用 HDFS)。

注意

因为需要在远程 Flink 集群上运行这个作业,所以我不能以简单的方式使用 TableEnvironment。这张票中提到了这个问题:https://issues.apache.org/jira/browse/FLINK-18095。对于当前的解决方案,我使用来自 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-Environment-for-Remote-Execution-td35691.html 的 adivce。

代码

EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
// create remote env
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "/path/to/my/jar");
// create StreamTableEnvironment
TableConfig tableConfig = new TableConfig();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
CatalogManager catalogManager = CatalogManager.newBuilder()
                                              .classLoader(classLoader)
                                              .config(tableConfig.getConfiguration())
                                              .defaultCatalog(
                                                  batchSettings.getBuiltInCatalogName(),
                                                  new GenericInMemoryCatalog(
                                                      batchSettings.getBuiltInCatalogName(),
                                                      batchSettings.getBuiltInDatabaseName()))
                                              .executionConfig(
                                                  streamExecutionEnvironment.getConfig())
                                              .build();
ModuleManager moduleManager = new ModuleManager();
BatchExecutor batchExecutor = new BatchExecutor(streamExecutionEnvironment);
FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
StreamTableEnvironmentImpl tableEnv = new StreamTableEnvironmentImpl(
    catalogManager,
    moduleManager,
    functionCatalog,
    tableConfig,
    streamExecutionEnvironment,
    new BatchPlanner(batchExecutor, tableConfig, functionCatalog, catalogManager),
    batchExecutor,
    false);
// configure HiveCatalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // a local path
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
// request to Hive
Table table = tableEnv.sqlQuery("select * from myhive.`default`.test");

问题

在这一步我可以调用 table.execute() 方法,然后通过 collect() 方法得到 CloseableIterator。但在我的情况下,由于我的请求,我可以获得大量行,将它收集到文件中(HDFS 中的 ORC)将是完美的。

我怎样才能达到我的目标?

【问题讨论】:

    标签: java hive apache-flink flink-sql


    【解决方案1】:

    Table.execute().collect() 将视图的结果返回到您的客户端以进行交互。在您的情况下,您可以使用文件系统连接器并使用 INSERT INTO 将视图写入文件。例如:

    // create a filesystem table
    tableEnvironment.executeSql("CREATE TABLE MyUserTable (\n" +
        "  column_name1 INT,\n" +
        "  column_name2 STRING,\n" +
        "  ..." +
        " \n" +
        ") WITH (\n" +
        "  'connector' = 'filesystem',\n" +
        "  'path' = 'hdfs://path/to/your/file',\n" +
        "  'format' = 'orc' \n" +
        ")");
    
    // submit the job
    tableEnvironment.executeSql("insert into MyUserTable select * from myhive.`default`.test");
    

    查看有关文件系统连接器的更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

    【讨论】:

    • 非常感谢,我会尽量利用这个机会。但我还有一个问题:就我而言,最好通过 HiveCatalog 创建表,但我有问题 (stackoverflow.com/questions/63719627/…)。请问您也可以帮我解答这个问题吗?
    猜你喜欢
    • 2021-12-26
    • 1970-01-01
    • 1970-01-01
    • 2016-10-08
    • 1970-01-01
    • 1970-01-01
    • 2021-03-22
    • 1970-01-01
    • 2018-07-24
    相关资源
    最近更新 更多