【问题标题】:How can i write avro files into S3 in Flink?如何在 Flink 中将 avro 文件写入 S3?
【发布时间】:2019-11-20 14:03:03
【问题描述】:

我想从 kafka 主题中读取流数据并以 avro 或 parquet 格式写入 S3。数据流看起来像 json 字符串,但我无法以 avro 或 parquet 格式转换并写入 S3。

我找到了一些代码 sn-ps 并尝试了

val sink = StreamingFileSink .forBulkFormat(新路径(outputS3Path),ParquetAvroWriters.forReflectRecord(classOf[myClass])) .build()

但我在 addSink 处得到“类型不匹配,预期 SinkFunction[String],实际:StreamingFileSink[TextOut]”

val 流 = 环境 .addSource(myConsumerSource) .addSink(sink)

请帮忙,谢谢!

【问题讨论】:

    标签: apache-flink


    【解决方案1】:
    【解决方案2】:

    这是我用来将 Parquet 文件存储到本地系统的代码。

    import org.apache.avro.generic.GenericRecord
    import org.apache.avro.{Schema, SchemaBuilder}
    import org.apache.flink.core.fs.Path
    import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
    import org.apache.flink.streaming.api.datastream.DataStreamSource
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.enableCheckpointing(100)
    val schema = SchemaBuilder
      .record("record")
      .fields()
      .requiredString("message")
      .endRecord()
    
    val stream: DataStreamSource[GenericRecord] = env.fromCollection(genericRecordList)
    val path = new Path(s"/tmp/flink-parquet-${System.currentTimeMillis()}")
    val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
      .forBulkFormat(path, ParquetAvroWriters.forGenericRecord(schema))
      .build()
    
    stream.addSink(sink)
    env.execute()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-10-04
      • 2019-08-19
      • 1970-01-01
      • 2018-07-01
      • 2021-09-20
      • 1970-01-01
      • 2020-05-06
      • 1970-01-01
      相关资源
      最近更新 更多