【问题标题】:Glue Job Writes Multiple Partitions to the Same File胶水作业将多个分区写入同一个文件
【发布时间】:2020-07-23 18:46:38
【问题描述】:

我正在尝试编写一个粘合作业,将多个 csv 文件转换为单独的 json 文件,使用 csv 的每一行作为文件。作业完成后,s3 中会显示正确数量的文件,但有些是空的,有些在同一个文件中有多个 json 对象。

应用映射后,这就是我创建分区和写入文件的方式:

numEntities = applyMapping1.toDF().count()
partitions = applymapping1.repartition(numEntities)
partitions.toDF().write.mode("ignore").format("json").option("header", "true").save("s3://location/test")

使用这个,一些文件被创建为一个 json 文件,其中一个接一个地有 2 个对象,有些是正确的,有些是空的。

有什么方法可以确保每个分区创建一个仅包含其数据的单独文件?

【问题讨论】:

    标签: pyspark aws-glue


    【解决方案1】:

    我认为repartition 后面的Partitioner 确实不完全 符合您的意图:

    它根据您的要求创建了尽可能多的分区 - 到目前为止一切顺利。但它并没有将行分配到每个分区中的一个。这可能是因为HashPartitioner 中的逻辑为不止一行计算了相同的哈希值。

    作为repartition.save... 的替代方案,您可以使用foreachPartition,然后遍历每一行,将其保存到文件(例如/tmp 下)并将其上传到S3。在这样做之前我不会repartition,因为将从foreachPartition 执行的UDF 相当昂贵,因此您应该尽量减少UDF 调用的次数。

    这是一个对我有用的例子。不过,它是用 Scala 编写的:

    dynamicFrame.
      repartition(1).
      toDF().
      foreachPartition(p => {
        val out = new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream("/tmp/temp.xsv.gz")))
        p.foreach(r => {
          val row = ...
          out.write(row)
        })
        val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).build()
        val tm = TransferManagerBuilder.standard().withS3Client(s3).build()
        val rq = new PutObjectRequest(bucket, key, new File("/tmp/temp.xsv.gz"))
        tm.upload(rq).waitForCompletion()
      })
    

    【讨论】:

    • 查看它,不确定如何在传递给 foreach 的函数中写入 s3。我已经尝试过 spark 上下文和 boto3,但都无法序列化以使用它。
    【解决方案2】:

    好的,看来我已经成功了。从rowing-ghoul 的回答我最终使用了foreach 来处理数据,但是由于spark 的工作原理,我不得不在之后将数据发送到s3。我还必须使用累加器将 json 字符串存储在 foreach 中。

    class ArrayAccumulator(AccumulatorParam):
      def zero(self, value):
        return []
      def addInPlace(self, val1, val2):
        val1.extend(val2)
        return val1
    jsonAccumulator = sc.accumulator([], ArrayAccumulator())
    
    def write_to_json(row):
      # Process json
      jsonAccumulator += [json]
    
    mappedDF = applymapping1.toDF()
    mappedDF.foreach(write_to_json)
    
    count = 0
    for x in jsonAccumulator.value:
      s3.Object('bucket-name', 'test/' + str(count) + '.json').put(Body=x)
      count += 1
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-26
      • 1970-01-01
      • 2017-01-26
      相关资源
      最近更新 更多