【问题标题】:Read Avro parquet file from inside JAR从 JAR 中读取 Avro parquet 文件
【发布时间】:2022-01-08 09:43:14
【问题描述】:

我正在尝试读取捆绑为 JAR 中资源的 parquet 文件,最好是作为流。

有没有人有一个不涉及首先将资源作为临时文件写出的工作示例?

这是我用来读取在打包为 JAR 之前在 IDE 中运行良好的文件的代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;

                        try {
                            Path path = new Path(classLoader.getResource(pattern_id).toURI());

                            Configuration conf = new Configuration();

                            try (ParquetReader<GenericRecord> r = AvroParquetReader.<GenericRecord>builder(
                                                                             HadoopInputFile.fromPath(path, conf))
                                                                             .disableCompatibility()
                                                                             .build()) {
                                patternsFound.add(pattern_id);

                                GenericRecord record;
                                while ((record = r.read()) != null) {
                                        // Do some work

                                }


                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        } catch (NullPointerException | URISyntaxException e) {

                            e.printStackTrace();
                        }

从 JAR 文件运行此代码时,我收到此错误:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "jar"

我认为我可以通过以下方式解决:

InputStream inputFile = classLoader.getResourceAsStream(pattern_id);

但不知道如何让 AvroParquetReader 与输入流一起工​​作。

【问题讨论】:

    标签: java hadoop jar avro parquet


    【解决方案1】:

    通过在此处调整解决方案,我最终能够将镶木地板文件作为资源流读取:https://stackoverflow.com/a/58261488/3112960

    import org.apache.commons.io.IOUtils;
    import org.apache.parquet.io.DelegatingSeekableInputStream;
    import org.apache.parquet.io.InputFile;
    import org.apache.parquet.io.SeekableInputStream;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    
    public class ParquetStream implements InputFile {
        private final String streamId;
        private final byte[] data;
    
        private static class SeekableByteArrayInputStream extends ByteArrayInputStream {
            public SeekableByteArrayInputStream(byte[] buf) {
                super(buf);
            }
    
            public void setPos(int pos) {
                this.pos = pos;
            }
    
            public int getPos() {
                return this.pos;
            }
        }
    
        public ParquetStream(String streamId, InputStream stream) throws IOException {
            this.streamId = streamId;
    
            this.data = IOUtils.toByteArray(stream);
        }
    
        @Override
        public long getLength()  {
            return this.data.length;
        }
    
        @Override
        public SeekableInputStream newStream() throws IOException {
            return new DelegatingSeekableInputStream(new SeekableByteArrayInputStream(this.data)) {
                @Override
                public void seek(long newPos) {
                    ((SeekableByteArrayInputStream) this.getStream()).setPos((int) newPos);
                }
    
                @Override
                public long getPos() {
                    return ((SeekableByteArrayInputStream) this.getStream()).getPos();
                }
            };
        }
    
        @Override
        public String toString() {
            return "ParquetStream[" + streamId + "]";
        }
    }
    

    那么我可以这样做:

                            InputStream in = classLoader.getResourceAsStream(pattern_id);
    
                            try {
                                ParquetStream parquetStream = new ParquetStream(pattern_id, in);
    
                                ParquetReader<GenericRecord> r = AvroParquetReader.<GenericRecord>builder(parquetStream)
                                                                                 .disableCompatibility()
                                                                                 .build();
    
                                GenericRecord record;
                                while ((record = r.read()) != null) {
                                    // do some work
                                }
                            } catch (IOException e) {
                                 e.printStackTrace();
                            }
    

    也许这会对将来的某人有所帮助,因为我找不到任何直接的答案。

    【讨论】:

      猜你喜欢
      • 2016-03-16
      • 1970-01-01
      • 1970-01-01
      • 2018-04-11
      • 2020-10-28
      • 1970-01-01
      • 1970-01-01
      • 2018-10-30
      相关资源
      最近更新 更多