【发布时间】:2020-12-23 15:04:38
【问题描述】:
我有一个由 KSQL 查询和 inut 流创建的表,它由 Kafka 主题支持。
本主题是使用 Kafka Connect 下沉到 s3。
在这个主题中,我有大约 1k msgs/sec。
该主题有 6 个分区和 3 个副本。
我有一个奇怪的输出比率。下沉似乎很奇怪。
这是我的监控:
monitoring
您可以看到第一个图表显示了输入比率 B/s,第二个图表显示了输出比率,第三个图表显示了使用 Burrow 计算的滞后。
这是我的 s3-sink 属性文件:
{
"name": "sink-feature-static",
"config": {
"topics": "FEATURE_APP_STATIC",
"topics.dir": "users-features-stream",
"tasks.max": "6",
"consumer.override.auto.offset.reset": "latest",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
"partition.duration.ms": "3600000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
"flush.size": 1000000,
"s3.part.size": 5242880,
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "600000",
"locale": "fr-FR",
"timezone": "UTC",
"timestamp.extractor": "Record",
"schema.compatibility": "NONE",
"aws.secret.access.key": "secretkey",
"aws.access.key.id": "accesskey",
"s3.bucket.name": "feature-store-prod-kafka-test",
"s3.region": "eu-west-1"
}
}
这是我在 s3 存储桶中观察到的内容:s3 bucket 在这些文件中,parquet.snappy 中有少量消息。 (有时只有 1 有时更多,...)。每个分区每秒大约 2 个文件。 (当我使用记录时间戳时,这是因为它正在赶上我认为的滞后)。
我所期待的是:
每 1000000 条消息 (flush.size) 或每 10 分钟 (rotate.schedule.interval.ms) 提交一次文件。
所以我期待(作为 1M 条消息 > 10 分钟 * 1Kmsg/s):
每小时 1/ 6(每 10 分钟)* 6(nb 个分区)parquet 文件
2/ 或者如果我错了,至少里面有 1M 条消息的文件...
但是没有观察到 1/ 或 2/ ...
而且我每小时都会在 s3 文件中出现巨大的延迟和刷新/提交(请参阅监控)。
“partition.duration.ms”:“3600000”会导致这种观察吗?
我哪里错了?
为什么我没有看到数据的连续输出刷新,而是出现这样的峰值?
谢谢! 雷米
【问题讨论】:
标签: amazon-s3 apache-kafka apache-kafka-connect sink