【发布时间】:2022-01-16 19:24:56
【问题描述】:
我有一个 Confluent 接收器连接器,它从 Kafka 主题中获取数据。然后将其摄取到 S3 存储桶中。
摄取工作正常,一切都很好,但是现在我需要在将 Avro 数据放入存储桶之前对其进行压缩。
我尝试了以下配置
{
"name":"--private-v1-s3-sink",
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"s3.region":"eu-west-1",
"partition.duration.ms":"3600000",
"rotate.schedule.interval.ms": "3600000",
"topics.dir":"svs",
"flush.size":"2500",
"schema.compatibility":"FULL",
"file.delim":"_",
"topics":"--connect.s3.format.avro.AvroFormat",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"--systems",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "${S3_BUCKET}",
"s3.acl.canned":"bucket-owner-full-control",
"avro.codec": "snappy",
"locale":"en-GB",
"timezone": "GMT",
"errors.tolerance": "all",
"path.format":"'ingest_date'=yyyy-MM-dd",
"timestamp.extractor":"Record"
我认为“avro.code”会压缩数据,但事实并非如此。取而代之的是我还尝试了“s3.compression.type”:“snappy”,但仍然没有运气!但是这确实适用于 JSON 和 GZIP。
不太确定出了什么问题?
【问题讨论】:
标签: amazon-s3 apache-kafka apache-kafka-connect confluent-platform