【问题标题】:How do I Combine or Merge Small ORC files into Larger ORC file?如何将小 ORC 文件合并或合并为更大的 ORC 文件?
【发布时间】:2018-10-07 02:35:55
【问题描述】:

SO 和网络上的大多数问题/答案都在讨论使用 Hive 将一堆小的 ORC 文件组合成一个更大的文件,但是,我的 ORC 文件是按天分隔的日志文件,我需要将它们分开。我只想每天“汇总”ORC 文件(它们是 HDFS 中的目录)。

我很可能需要用 Java 编写解决方案,并且遇到过OrcFileMergeOperator,这可能是我需要使用的,但现在说还为时过早。

解决此问题的最佳方法是什么?

【问题讨论】:

    标签: java hive hdfs orc


    【解决方案1】:

    这是一个使用PyORC 将小的ORC 文件连接在一起的Python 小脚本。我知道它不会直接回答您的问题,因为它不是在 Java 中,但我发现它比当前的解决方案或使用 Hive 更简单。

    import pyorc
    import argparse
    
    
    def main():
        parser = argparse.ArgumentParser()
        parser.add_argument('-o', '--output', type=argparse.FileType(mode='wb'))
        parser.add_argument('files', type=argparse.FileType(mode='rb'), nargs='+')
        args = parser.parse_args()
    
        schema = str(pyorc.Reader(args.files[0]).schema)
    
        with pyorc.Writer(args.output, schema) as writer:
            for i, f in enumerate(args.files):
                reader = pyorc.Reader(f)
                if str(reader.schema) != schema:
                    raise RuntimeError(
                        "Inconsistent ORC schemas.\n"
                        "\tFirst file schema: {}\n"
                        "\tFile #{} schema: {}"
                        .format(schema, i, str(reader.schema))
                    )
                for line in reader:
                    writer.write(line)
    
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

      【解决方案2】:

      您无需重新发明轮子。

      ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE 可用于将小的 ORC 文件合并到更大的文件中,因为Hive 0.14.0. 合并发生在条带级别,这避免了对数据进行解压缩和解码。它工作得很快。我建议创建一个按天分区的外部表(分区是目录),然后将它们全部合并,指定PARTITION (day_column) 作为分区规范。

      请看这里:LanguageManual+ORC

      【讨论】:

      • 不幸的是,我没有“day_column”,因为数据是“journald -o json”的输出,它被转换为 ORC 并存储在 HDFS 的外部表中,目录结构为 yyyy/ mm/dd/file1.orc file2.orc file3.orc等。所有时间戳都以纪元时间为准。
      • @ChrisC 但是您可以在数据根文件夹顶部创建带有附加分区列的新外部表,然后将分区挂载到 yyyy/mm/dd 位置,然后使用 hive 连接所有分区。 1)通过load_day创建外部表+分区。 2)更改表添加分区等以将它们全部挂载。 3) 使用 Hive 连接
      • CONCATENATE 也适用于外部表吗?我知道它没有。
      • @OmarAli 当然,它有效。外部或管理,没关系。外部和托管之间的唯一区别是 DROP 表行为。托管表 DROP 也会删除数据。外部表删除将仅删除表定义。您还可以一次在 HDFS 的同一目录上创建几个不同的表。
      • @FoxanNg 是的,它是在issues.apache.org/jira/browse/HIVE-17403Hive 的 3.0.0、2.4.0 版本中添加的
      【解决方案3】:

      这里有很好的答案,但没有一个允许我运行 cron 作业以便我可以每天汇总。我们每天都将日志文件写入 HDFS,我不想每天进来时都在 Hive 中运行查询。

      我最终做的事情对我来说似乎更直接。我编写了一个 Java 程序,它使用 ORC 库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的 Writer,它是“组合”文件(以“.”开头,因此它对 Hive 隐藏,否则 Hive 将失败)。然后程序打开列表中的每个文件并读取内容并写出组合文件。读取所有文件后,它会删除文件。我还添加了一次运行一个目录的功能,以备不时之需。

      注意:您将需要一个架构文件。 Journald 日志可以以 json "journalctl -o json" 的形式输出,然后您可以使用 Apache ORC 工具生成模式文件,也可以手动生成。 ORC 的自动生成很好,但手动总是更好。

      注意:要按原样使用此代码,您需要一个有效的密钥表并在类路径中添加 -Dkeytab=。

      import java.io.FileNotFoundException;
      import java.io.IOException;
      import java.io.InputStream;
      import java.net.InetAddress;
      import java.util.ArrayList;
      import java.util.List;
      
      import org.apache.commons.io.IOUtils;
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.FileStatus;
      import org.apache.hadoop.fs.FileSystem;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
      import org.apache.hadoop.security.UserGroupInformation;
      import org.apache.orc.OrcFile;
      import org.apache.orc.Reader;
      import org.apache.orc.RecordReader;
      import org.apache.orc.TypeDescription;
      import org.apache.orc.Writer;
      
      import com.cloudera.org.joda.time.LocalDate;
      
      public class OrcFileRollUp {
      
        private final static String SCHEMA = "journald.schema";
        private final static String UTF_8 = "UTF-8";
        private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
        private static final String keytabLocation = System.getProperty("keytab");
        private static final String kerberosUser = "<userName>";
        private static Writer writer;
      
        public static void main(String[] args) throws IOException {
      
          Configuration conf = new Configuration();
          conf.set("hadoop.security.authentication", "Kerberos");
      
          InetAddress myHost = InetAddress.getLocalHost();
          String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
          UserGroupInformation.setConfiguration(conf);
          UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);
      
          int currentDay = LocalDate.now().getDayOfMonth();
          int currentMonth = LocalDate.now().getMonthOfYear();
          int currentYear = LocalDate.now().getYear();
      
          Path path = new Path(HDFS_BASE_LOGS_DIR);
      
          FileSystem fileSystem = path.getFileSystem(conf);
          System.out.println("The URI is: " + fileSystem.getUri());
      
      
          //Get Hosts:
          List<String> allHostsPath = getHosts(path, fileSystem);
      
          TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
              .replaceAll("\n", ""));
      
          //Open each file for reading and write contents
          for(int i = 0; i < allHostsPath.size(); i++) {
      
            String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working
      
            //Create list of files from directory and today's date OR pass a directory in via the command line in format 
            //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
            String directory = "";
            Path outFilePath;
            Path argsPath;
            List<String> orcFiles;
      
            if(args.length == 0) {
              directory = currentYear + "/" + currentMonth + "/" + currentDay;
              outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
              try {
                orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
              } catch (Exception e) {
                continue;
              }
            } else {
              outFilePath = new Path(args[0] + "/" + outFile);
              argsPath = new Path(args[0]);
              try {
                orcFiles = getAllFilePath(argsPath, fileSystem);
              } catch (Exception e) {
                continue;
              }
            }
      
            //Create List of files in the directory
      
            FileSystem fs = outFilePath.getFileSystem(conf);
      
            //Writer MUST be below ^^ or the combination file will be deleted as well.
            if(fs.exists(outFilePath)) {
              System.out.println(outFilePath + " exists, delete before continuing.");
            } else {
             writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
                  .setSchema(schema));
            }
      
            for(int j = 0; j < orcFiles.size(); j++ ) { 
              Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));
      
              VectorizedRowBatch batch = reader.getSchema().createRowBatch();
              RecordReader rows = reader.rows();
      
              while (rows.nextBatch(batch)) {
                if (batch != null) {
                   writer.addRowBatch(batch);
                }
              }
              rows.close();
              fs.delete(new Path(orcFiles.get(j)), false);
            }
            //Close File
            writer.close();
      
            //Remove leading "." from ORC file to make visible to Hive
            outFile = fileSystem.getFileStatus(outFilePath)
                                            .getPath()
                                            .getName();
      
            if (outFile.startsWith(".")) {
              outFile = outFile.substring(1);
      
              int lastIndexOf = outFile.lastIndexOf(".working");
              outFile = outFile.substring(0, lastIndexOf);
            }
      
            Path parent = outFilePath.getParent();
      
            fileSystem.rename(outFilePath, new Path(parent, outFile));
      
            if(args.length != 0)
              break;
          }
        }
      
        private static String getSchema(String resource) throws IOException {
          try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
            return IOUtils.toString(input, UTF_8);
          }
        }
      
        public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
          List<String> hostsList = new ArrayList<String>();
          FileStatus[] fileStatus = fs.listStatus(filePath);
          for (FileStatus fileStat : fileStatus) {
            hostsList.add(fileStat.getPath().toString());
          }
          return hostsList;
        }
      
        private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
          List<String> fileList = new ArrayList<String>();
          FileStatus[] fileStatus = fs.listStatus(filePath);
          for (FileStatus fileStat : fileStatus) {
            if (fileStat.isDirectory()) {
              fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
            } else {
              fileList.add(fileStat.getPath()
                                   .toString());
            }
          }
          for(int i = 0; i< fileList.size(); i++) {
            if(!fileList.get(i).endsWith(".orc"))
              fileList.remove(i);
          }
      
          return fileList;
        }
      
      }
      

      【讨论】:

      • 说真的,有人否决了我自己的解决方案?然后提供一个更好的选择,不要只是投反对票并继续前进。
      • 如果不需要,您是否知道如何禁用身份验证 (Kerberos)?
      • @Ryan 你知道如何禁用身份验证(kerberos)吗?
      • @Riddle 我有一个可以分享的代码库。你能提醒我星期二吗?现在就打倒新冠病毒。
      • @Riddle 让我知道这个 sn-p 是否对您有帮助 goonlinetools.com/snapshot/code/#ktak5lzugmoyapeyipuy8e
      猜你喜欢
      • 2018-04-10
      • 2018-05-27
      • 2019-02-11
      • 2018-06-09
      • 2018-06-10
      • 2020-03-02
      • 2017-07-23
      • 2020-03-11
      • 2020-02-19
      相关资源
      最近更新 更多