【问题标题】:decompress (unzip/extract) util using spark scala使用 spark scala 解压缩(解压缩/提取)实用程序
【发布时间】:2020-06-29 18:59:53
【问题描述】:

我在 HDFS 中有 customer_input_data.tar.gz,其中有 10 个不同的 csv 文件格式的表数据。所以我需要使用 spark scala 将此文件解压缩到 /my/output/path

请建议如何使用 spark scala 解压缩 customer_input_data.tar.gz 文件

【问题讨论】:

标签: scala apache-spark pyspark apache-spark-sql


【解决方案1】:

gzip 不是 Hadoop 中的 splittable 格式。因此,文件不会真正分布在整个集群中,并且您不会从 hadoop 或 Spark 中的分布式计算/处理中获得任何好处。

更好的方法可能是,

  • 在 OS 上解压缩文件,然后将文件单独发送回 hadoop。

如果你仍然想在 scala 中解压,你可以简单地通过 java 类GZIPInputStream

new GZIPInputStream(new FileInputStream("your file path"))

【讨论】:

    【解决方案2】:

    我开发了以下代码来使用 scala 解压缩文件。您需要传递输入路径和输出路径以及Hadoop文件系统

        /*below method used for processing zip files*/
      @throws[IOException]
      private def processTargz(fullpath: String, houtPath: String, fs: FileSystem): Unit = {
        val path = new Path(fullpath)
        val gzipIn = new GzipCompressorInputStream(fs.open(path))
        try {
          val tarIn = new TarArchiveInputStream(gzipIn)
          try {
            var entry:TarArchiveEntry = null
            out.println("Tar entry")
            out.println("Tar Name entry :" + FilenameUtils.getName(fullpath))
            val fileName1 = FilenameUtils.getName(fullpath)
            val tarNamesFolder = fileName1.substring(0, fileName1.indexOf('.'))
            out.println("Folder Name : " + tarNamesFolder)
            while ( {
              (entry = tarIn.getNextEntry.asInstanceOf[TarArchiveEntry]) != null
            }) { // entity Name as tsv file name which are part of inside compressed tar file
              out.println("ENTITY NAME : " + entry.getName)
    
              /** If the entry is a directory, create the directory. **/
              out.println("While")
              if (entry.isDirectory) {
                val f = new File(entry.getName)
                val created = f.mkdir
                out.println("mkdir")
                if (!created) {
                  out.printf("Unable to create directory '%s', during extraction of archive contents.%n", f.getAbsolutePath)
                  out.println("Absolute path")
                }
              }
              else {
                var count = 0
                val slash = "/"
                val targetPath = houtPath + slash + tarNamesFolder + slash + entry.getName
                val hdfswritepath = new Path(targetPath)
                val fos = fs.create(hdfswritepath, true)
                try {
                  val dest = new BufferedOutputStream(fos, BUFFER_SIZE)
                  try {
                    val data = new Array[Byte](BUFFER_SIZE)
                    while ( {
                      (count = tarIn.read(data, 0, BUFFER_SIZE)) != -1
                    }) dest.write(data, 0, count)
                  } finally if (dest != null) dest.close()
                }
              }
            }
            out.println("Untar completed successfully!")
          } catch {
            case e: IOException =>
              out.println("catch Block")
          } finally {
            out.println("FINAL Block")
            if (tarIn != null) tarIn.close()
          }
        }
      }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-09-01
      • 2019-09-30
      • 1970-01-01
      • 1970-01-01
      • 2019-08-06
      • 2012-05-22
      • 1970-01-01
      相关资源
      最近更新 更多