【问题标题】:Flink BucketingSink with Custom AvroParquetWriter create empty fileFlink BucketingSink 与 Custom AvroParquetWriter 创建空文件
【发布时间】:2017-02-17 07:28:29
【问题描述】:

我为 BucketingSink 创建了一个编写器。接收器和编写器正常工作,但是当编写器将 avro genericrecord 写入镶木地板时,该文件是从进行中创建的,等待完成。但是文件是空的,只有 0 个字节。谁能告诉我代码有什么问题?我尝试将 AvroParquetWriter 的初始化放在 open() 方法中,但结果仍然相同。

在调试代码时,我确认 writer.write(element) 确实执行并且元素包含 avro genericrecord 数据

流数据

BucketingSink<DataEventRecord> sink =
    new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");

sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());

ParquetSinkWriter

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;

public class ParquetSinkWriter<T> extends StreamWriterBase<T> {

  private transient ParquetWriter<GenericRecord> writer;

  private Path path;
  private FileSystem fs;
  private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
  private final int blockSize = 256 * 1024 * 1024;
  private final int pageSize = 64 * 1024;


  @Override
  // workaround
  public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.path = path;
    this.fs = fs;
  }

  @Override
  public void write(T event) throws IOException {
    DataEventRecord element = (DataEventRecord) event;

    if (writer == null) {
      writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);
    }

    if (writer != null) {
      GenericRecord datum = element.getRecord();
      writer.write(datum);
    }
  }

  @Override
  public void close() throws IOException {
    if (writer != null) {
      writer.close();
    }
    super.close();
  }

  @Override
  public Writer<T> duplicate() {
    return new ParquetSinkWriter<T>();
  }

}

【问题讨论】:

  • 我设法解决了这个问题。在写入过程中创建 AvroParquetWRiter 实例的同时调用 super.open(fs, path) 存在问题。 open 事件已经创建了一个文件,并且作者也在尝试创建相同的文件,但由于文件已经存在而无法创建。因此,总是有 0 条记录写入文件,因为 Avro 写入器无法写入已存在的文件。删除 super.open 会导致基类因“Writer is not open”而失败。我最终基于 BucketingSink 扩展了我自己的 sink 类,现在一切正常。
  • 您能否提供一些参考代码来说明您是如何解决的?我也遇到了同样的问题
  • 你不能简单地实现Writer接口而不是使用StreamWriterBase吗? StreamWriterBase 打开一个FSDataOutputStream 到你不需要的文件。
  • @jlim 你能分享你的解决方案吗?我遇到了同样的问题
  • @igx 我们已经从 Flink 搬出并建立了自己的摄取管道。给我你的电子邮件,我会通过电子邮件发送我们从 Flink 扩展的代码。请记住,我们在此期间修改的那些文件是针对 Flink 1.3 的。确保将此文件与您正在使用的 Flink 版本进行比较并合并所需的内容。

标签: apache-flink flink-streaming rollingfilesink


【解决方案1】:

直接实现Writer应该是这样的

import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
 * Parquet writer.
 *
 * @param <T>
 */
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {

    private static final long serialVersionUID = -975302556515811398L;

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
    private final int pageSize = 64 * 1024;

    private final String schemaRepresentation;

    private transient Schema schema;
    private transient ParquetWriter<GenericRecord> writer;
    private transient Path path;

    private int position;

    public ParquetSinkWriter(String schemaRepresentation) {
        this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        this.position = 0;
        this.path = path;

        if (writer != null) {
            writer.close();
        }

        writer = createWriter();
    }

    @Override
    public long flush() throws IOException {
        Preconditions.checkNotNull(writer);
        position += writer.getDataSize();
        writer.close();
        writer = createWriter();

        return position;
    }

    @Override
    public long getPos() throws IOException {
        Preconditions.checkNotNull(writer);
        return position + writer.getDataSize();
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
            writer = null;
        }
    }

    @Override
    public void write(T element) throws IOException {
        Preconditions.checkNotNull(writer);
        writer.write(element);
    }

    @Override
    public Writer<T> duplicate() {
        return new ParquetSinkWriter<>(schemaRepresentation);
    }

    private ParquetWriter<GenericRecord> createWriter() throws IOException {
        if (schema == null) {
            schema = new Schema.Parser().parse(schemaRepresentation);
        }

        return AvroParquetWriter.<GenericRecord>builder(path)
            .withSchema(schema)
            .withDataModel(new GenericData())
            .withCompressionCodec(compressionCodecName)
            .withPageSize(pageSize)
            .build();
    }
}

【讨论】:

  • 引起:org.apache.hadoop.fs.FileAlreadyExistsException: /clicks-json/partitionkey=2018-01-12--16-30/_part-6-0.in-progress 客户端127.0.0.1 已经存在于 org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:495) 的 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2563) org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) 中的 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
猜你喜欢
  • 1970-01-01
  • 2018-05-20
  • 1970-01-01
  • 2018-06-20
  • 2019-05-06
  • 1970-01-01
  • 2022-08-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多