【问题标题】:How to convert .txt file to Hadoop's sequence file format如何将 .txt 文件转换为 Hadoop 的序列文件格式
【发布时间】:2023-04-09 13:49:01
【问题描述】:

为了有效利用 Hadoop 中的 map-reduce 作业,我需要将数据存储在 hadoop's sequence file format 中。但是,目前数据只有平面 .txt 格式。谁能建议我可以将 .txt 文件转换为序列文件的方法?

【问题讨论】:

    标签: java file hadoop type-conversion hive


    【解决方案1】:

    这取决于 TXT 文件的格式。每条记录一行吗?如果是这样,您可以简单地使用 TextInputFormat 为每一行创建一个记录。在您的映射器中,您可以解析该行并以您选择的任何方式使用它。

    如果不是每条记录一行,您可能需要编写自己的 InputFormat 实现。请查看this tutorial 了解更多信息。

    【讨论】:

      【解决方案2】:

      因此,更简单的答案只是具有 SequenceFile 输出的“身份”作业。

      在java中看起来像这样:

          public static void main(String[] args) throws IOException,
              InterruptedException, ClassNotFoundException {
      
          Configuration conf = new Configuration();
          Job job = new Job(conf);
          job.setJobName("Convert Text");
          job.setJarByClass(Mapper.class);
      
          job.setMapperClass(Mapper.class);
          job.setReducerClass(Reducer.class);
      
          // increase if you need sorting or a special number of files
          job.setNumReduceTasks(0);
      
          job.setOutputKeyClass(LongWritable.class);
          job.setOutputValueClass(Text.class);
      
          job.setOutputFormatClass(SequenceFileOutputFormat.class);
          job.setInputFormatClass(TextInputFormat.class);
      
          TextInputFormat.addInputPath(job, new Path("/lol"));
          SequenceFileOutputFormat.setOutputPath(job, new Path("/lolz"));
      
          // submit and wait for completion
          job.waitForCompletion(true);
         }
      

      【讨论】:

      • 所以,如果我有 100 个 .txt 文件,这会给我 100 个 .seq 文件,对吗?如果我想要 1 个大的 .seq 文件怎么办?
      • 我猜:job.setNumReduceTasks(1);
      • 我在尝试编写此内容时遇到错误:addInputPaths 参数必须是 Conf 或 JobConf 类型,而不是 Job。如果我将 Job 更改为 JobConf,则 setMapperClass 和 setReducerClass 方法不可用。
      【解决方案3】:

      如果您的数据不在 HDFS 上,则需要将其上传到 HDFS。两种选择:

      i) hdfs - 放在你的 .txt 文件上,一旦你把它放到 HDFS 上,你就可以把它转换成 seq 文件。

      ii) 您将文本文件作为 HDFS 客户端框中的输入,并使用序列文件 API 通过创建 SequenceFile.Writer 并将 (key,values) 附加到它来转换为 SeqFile。

      如果你不关心key,你可以将行号作为key,将完整的文本作为value。

      【讨论】:

      • 我需要使用第一个选项。我该怎么做?
      【解决方案4】:

      您也可以只创建一个中间表,将 csv 内容直接加载到其中,然后创建第二个表作为序列文件(分区、集群等)并从中间表中插入 select。您还可以设置压缩选项,例如,

      set hive.exec.compress.output = true;
      set io.seqfile.compression.type = BLOCK;
      set mapred.output.compression.codec = org.apache.hadoop.io.compress.SnappyCodec;
      
      create table... stored as sequencefile;
      
      insert overwrite table ... select * from ...;
      

      MR 框架会为您处理繁重的工作,省去您编写 Java 代码的麻烦。

      【讨论】:

        【解决方案5】:
        import java.io.IOException;
        import java.net.URI;
        
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        import org.apache.hadoop.io.IntWritable;
        import org.apache.hadoop.io.SequenceFile;
        import org.apache.hadoop.io.Text;
        
        //White, Tom (2012-05-10). Hadoop: The Definitive Guide (Kindle Locations 5375-5384). OReilly Media - A. Kindle Edition. 
        
        public class SequenceFileWriteDemo { 
        
            private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
        
            public static void main( String[] args) throws IOException { 
                String uri = args[ 0];
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create( uri), conf);
                Path path = new Path( uri);
                IntWritable key = new IntWritable();
                Text value = new Text();
                SequenceFile.Writer writer = null;
                try { 
                    writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
                    for (int i = 0; i < 100; i ++) { 
                        key.set( 100 - i);
                        value.set( DATA[ i % DATA.length]);
                        System.out.printf("[% s]\t% s\t% s\n", writer.getLength(), key, value); 
                        writer.append( key, value); } 
                } finally 
                { IOUtils.closeStream( writer); 
                } 
            } 
        }
        

        【讨论】:

        • 这里的uri是什么?
        【解决方案6】:

        如果你安装了 Mahout - 它有一个叫做 : seqdirectory 的东西——它可以做到

        【讨论】:

          【解决方案7】:

          注意格式说明符:

          例如(注意%s之间的空格),System.out.printf("[% s]\t% s\t% s\n", writer.getLength(), key, value);会给我们java.util.FormatFlagsConversionMismatchException: Conversion = s, Flags =

          相反,我们应该使用:

          System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); 
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2014-01-20
            • 1970-01-01
            • 2021-12-11
            • 1970-01-01
            • 1970-01-01
            • 2011-01-25
            • 1970-01-01
            相关资源
            最近更新 更多