【发布时间】:2017-02-17 07:28:29
【问题描述】:
我为 BucketingSink 创建了一个编写器。接收器和编写器正常工作,但是当编写器将 avro genericrecord 写入镶木地板时,该文件是从进行中创建的,等待完成。但是文件是空的,只有 0 个字节。谁能告诉我代码有什么问题?我尝试将 AvroParquetWriter 的初始化放在 open() 方法中,但结果仍然相同。
在调试代码时,我确认 writer.write(element) 确实执行并且元素包含 avro genericrecord 数据
流数据
BucketingSink<DataEventRecord> sink =
new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");
sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());
ParquetSinkWriter
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;
public class ParquetSinkWriter<T> extends StreamWriterBase<T> {
private transient ParquetWriter<GenericRecord> writer;
private Path path;
private FileSystem fs;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int blockSize = 256 * 1024 * 1024;
private final int pageSize = 64 * 1024;
@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.path = path;
this.fs = fs;
}
@Override
public void write(T event) throws IOException {
DataEventRecord element = (DataEventRecord) event;
if (writer == null) {
writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);
}
if (writer != null) {
GenericRecord datum = element.getRecord();
writer.write(datum);
}
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
super.close();
}
@Override
public Writer<T> duplicate() {
return new ParquetSinkWriter<T>();
}
}
【问题讨论】:
-
我设法解决了这个问题。在写入过程中创建 AvroParquetWRiter 实例的同时调用 super.open(fs, path) 存在问题。 open 事件已经创建了一个文件,并且作者也在尝试创建相同的文件,但由于文件已经存在而无法创建。因此,总是有 0 条记录写入文件,因为 Avro 写入器无法写入已存在的文件。删除 super.open 会导致基类因“Writer is not open”而失败。我最终基于 BucketingSink 扩展了我自己的 sink 类,现在一切正常。
-
您能否提供一些参考代码来说明您是如何解决的?我也遇到了同样的问题
-
你不能简单地实现
Writer接口而不是使用StreamWriterBase吗?StreamWriterBase打开一个FSDataOutputStream到你不需要的文件。 -
@jlim 你能分享你的解决方案吗?我遇到了同样的问题
-
@igx 我们已经从 Flink 搬出并建立了自己的摄取管道。给我你的电子邮件,我会通过电子邮件发送我们从 Flink 扩展的代码。请记住,我们在此期间修改的那些文件是针对 Flink 1.3 的。确保将此文件与您正在使用的 Flink 版本进行比较并合并所需的内容。
标签: apache-flink flink-streaming rollingfilesink