【问题标题】:Reading an ORC file in Java在 Java 中读取 ORC 文件
【发布时间】:2015-12-19 04:46:01
【问题描述】:

如何在 Java 中读取 ORC 文件?我想读入一个小文件以进行一些单元测试输出验证,但我找不到解决方案。

【问题讨论】:

    标签: java hadoop orc


    【解决方案1】:

    阅读兽人测试用例

    
        @Test
        public void read_orc() throws Exception {
            //todo do kerberos auth
            String orcPath = "hdfs://user/hive/warehouse/demo.db/orc_path";
            //load hdfs conf
            Configuration conf = new Configuration();
            conf.addResource(getClass().getResource("/hdfs-site.xml"));
            conf.addResource(getClass().getResource("/core-site.xml"));
            FileSystem fs = FileSystem.get(conf);
            // custom read column
            List<String> columns = Arrays.asList("id", "title");
            final List<Map<String, Object>> maps = OrcUtil.readOrcFile(fs, orcPath, columns);
            System.out.println(new Gson().toJson(maps));
        }
    

    OrcUtil用特殊列读取orc路径

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.stream.Collectors;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.hive.ql.io.orc.OrcFile;
    import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
    import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
    import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
    import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
    import org.apache.hadoop.hive.ql.io.orc.Reader;
    import org.apache.hadoop.hive.serde2.SerDeException;
    import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.InputFormat;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    
    public class OrcUtil {
        
        public static List<Map<String, Object>> readOrcFile(FileSystem fs, String orcPath, List<String> readColumns)
                throws IOException, SerDeException {
            JobConf jobConf = new JobConf();
            for (Map.Entry<String, String> entry : fs.getConf()) {
                jobConf.set(entry.getKey(), entry.getValue());
            }
            
            FileInputFormat.setInputPaths(jobConf, orcPath);
            FileInputFormat.setInputPathFilter(jobConf, ((PathFilter) path1 -> true).getClass());
            
            InputSplit[] splits = new OrcInputFormat().getSplits(jobConf, 1);
            InputFormat<NullWritable, OrcStruct> orcInputFormat = new OrcInputFormat();
            
            List<Map<String, Object>> rows = new ArrayList<>();
            for (InputSplit split : splits) {
                OrcSplit orcSplit = (OrcSplit) split;
                System.out.printf("read orc split %s%n", ((OrcSplit) split).getPath());
                StructObjectInspector inspector = getStructObjectInspector(orcSplit.getPath(), jobConf, fs);
                List<? extends StructField> readFields = inspector.getAllStructFieldRefs()
                        .stream().filter(e -> readColumns.contains(e.getFieldName())).collect(Collectors.toList());
                // 49B file is empty
                if (orcSplit.getLength() > 49) {
                    RecordReader<NullWritable, OrcStruct> recordReader = orcInputFormat.getRecordReader(orcSplit, jobConf, Reporter.NULL);
                    NullWritable key = recordReader.createKey();
                    OrcStruct value = recordReader.createValue();
                    while (recordReader.next(key, value)) {
                        Map<String, Object> entity = new HashMap<>();
                        for (StructField field : readFields) {
                            entity.put(field.getFieldName(), inspector.getStructFieldData(value, field));
                        }
                        rows.add(entity);
                    }
                }
            }
            return rows;
        }
        
        private static StructObjectInspector getStructObjectInspector(Path path, JobConf jobConf, FileSystem fs)
                throws IOException, SerDeException {
            OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(jobConf);
            readerOptions.filesystem(fs);
            Reader reader = OrcFile.createReader(path, readerOptions);
            String typeStruct = reader.getObjectInspector().getTypeName();
            System.out.println(typeStruct);
            List<String> columnList = parseColumnAndType(typeStruct);
            String[] fullColNames = new String[columnList.size()];
            String[] fullColTypes = new String[columnList.size()];
            for (int i = 0; i < columnList.size(); ++i) {
                String[] temp = columnList.get(i).split(":");
                fullColNames[i] = temp[0];
                fullColTypes[i] = temp[1];
            }
            Properties p = new Properties();
            p.setProperty("columns", StringUtils.join(fullColNames, ","));
            p.setProperty("columns.types", StringUtils.join(fullColTypes, ":"));
            OrcSerde orcSerde = new OrcSerde();
            orcSerde.initialize(jobConf, p);
            return (StructObjectInspector) orcSerde.getObjectInspector();
        }
        
        private static List<String> parseColumnAndType(String typeStruct) {
            int startIndex = typeStruct.indexOf("<") + 1;
            int endIndex = typeStruct.lastIndexOf(">");
            typeStruct = typeStruct.substring(startIndex, endIndex);
            
            List<String> columnList = new ArrayList<>();
            List<String> splitList = Arrays.asList(typeStruct.split(","));
            Iterator<String> it = splitList.iterator();
            while (it.hasNext()) {
                StringBuilder current = new StringBuilder(it.next());
                String currentStr = current.toString();
                boolean left = currentStr.contains("(");
                boolean right = currentStr.contains(")");
                if (!left && !right) {
                    columnList.add(currentStr);
                    continue;
                }
                if (left && right) {
                    columnList.add(currentStr);
                    continue;
                }
                if (left && !right) {
                    while (it.hasNext()) {
                        String next = it.next();
                        current.append(",").append(next);
                        if (next.contains(")")) {
                            break;
                        }
                    }
                    columnList.add(current.toString());
                }
            }
            return columnList;
        }
    }
    
    

    【讨论】:

      【解决方案2】:

      最近遇到了这个并自己实现了一个

      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.hive.ql.io.orc.OrcFile;
      import org.apache.hadoop.hive.ql.io.orc.Reader;
      import org.apache.hadoop.hive.ql.io.orc.RecordReader;
      
      import org.apache.hadoop.hive.serde2.objectinspector.StructField;
      import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
      
      import java.util.List;
      
      public class OrcFileDirectReaderExample {
          public static void main(String[] argv)
          {
              try {
                  Reader reader = OrcFile.createReader(HdfsFactory.getFileSystem(), new Path("/user/hadoop/000000_0"));
                  StructObjectInspector inspector = (StructObjectInspector)reader.getObjectInspector();
                  System.out.println(reader.getMetadata());
                  RecordReader records = reader.rows();
                  Object row = null;
                  //These objects are the metadata for each column.  They give you the type of each column and can parse it unless you
                  //want to parse each column yourself
                  List fields = inspector.getAllStructFieldRefs();
                  for(int i = 0; i < fields.size(); ++i) {
                      System.out.print(((StructField)fields.get(i)).getFieldObjectInspector().getTypeName() + '\t');
                  }
      
                  while(records.hasNext())
                  {
                      row = records.next(row);
                      List value_lst = inspector.getStructFieldsDataAsList(row);
                      StringBuilder builder = new StringBuilder();
                      //iterate over the fields
                      //Also fields can be null if a null was passed as the input field when processing wrote this file
                      for(Object field : value_lst) {
                          if(field != null)
                              builder.append(field.toString());
                          builder.append('\t');
                      }
                      //this writes out the row as it would be if this were a Text tab seperated file
                      System.out.println(builder.toString());
                  }
              }catch (Exception e)
              {
                  e.printStackTrace();
              }
      
          }
      }
      

      【讨论】:

      • 这救了我!完美运行!
      • HdfsFactory 类的依赖是什么?
      【解决方案3】:

      试试这个来获取 ORCFile 行数...

      private long getRowCount(FileSystem fs, String fName) throws Exception {
          long tempCount = 0;
          Reader rdr = OrcFile.createReader(fs, new Path(fName));
          StructObjectInspector insp = (StructObjectInspector) rdr.getObjectInspector();
          Iterable<StripeInformation> iterable = rdr.getStripes();
          for(StripeInformation stripe:iterable){
              tempCount = tempCount + stripe.getNumberOfRows();
          }
          return tempCount;
      }   
      
      //fName is hdfs path to file.
      long rowCount = getRowCount(fs,fName);
      

      【讨论】:

      • 你能提到导入,外部库吗?除非提到这些,否则单独的功能很难重用。
      【解决方案4】:

      根据 Apache Wiki,Hive 0.11 中引入了 ORC 文件格式。

      因此,您将需要项目源路径中的 Hive 包来读取 ORC 文件。相同的包是

      org.apache.hadoop.hive.ql.io.orc.Reader;
      org.apache.hadoop.hive.ql.io.orc.OrcFile
      

      【讨论】:

      • 我已经尝试使用这些实现解决方案,但运气不佳。
      猜你喜欢
      • 2017-08-07
      • 2015-11-25
      • 2017-08-14
      • 2018-02-27
      • 2020-09-21
      • 1970-01-01
      • 2016-07-28
      • 2015-08-27
      • 1970-01-01
      相关资源
      最近更新 更多