【问题标题】:Handle lags in Kafka S3 Connector处理 Kafka S3 连接器中的滞后
【发布时间】:2018-07-10 19:41:15
【问题描述】:

我们正在使用 Kafka Connect [分布式,confluence 4.0]。

它工作得很好,除了连接器侦听的主题中始终保留未提交的消息。该行为可能与 S3 连接器 配置 "flush.size": "20000" 有关。主题中的滞后总是低于刷新大小。

我们的数据是分批进来的,我不想等到下一批到达,也不想减少flush.size 并创建大量文件。 是否可以设置超时,即使数据未达到 20000 个事件,S3 连接器也会刷新数据?

谢谢!

"config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "event",
    "tasks.max": "3",
    "topics.dir": "connect",
    "s3.region": "some_region",
    "s3.bucket.name": "some_bucket",
    "s3.part.size": "5242880",
    "flush.size": "20000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "FULL",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'\''day_ts'\''=YYYYMMdd/'\''hour_ts'\''=H",
    "partition.duration.ms": "3600000",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "time"
  }
}

【问题讨论】:

    标签: amazon-s3 apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    要使用 S3 连接器定期刷新低容量主题的未完成记录,您可以使用配置属性:

    rotate.schedule.interval.ms

    (完整的配置列表here

    请记住,使用上面的属性,在重新处理或从错误中恢复时,您可能会看到重复的消息,无论您使用的是哪个分区器。

    【讨论】:

      猜你喜欢
      • 2021-02-25
      • 2020-07-26
      • 2022-11-24
      • 2021-10-06
      • 2020-12-12
      • 2020-05-26
      • 2020-04-05
      • 2023-01-06
      • 2021-02-17
      相关资源
      最近更新 更多