【问题标题】:How to create partition using FieldPartitioner from Kafka S3 Sink Connector如何使用来自 Kafka S3 Sink Connector 的 FieldPartitioner 创建分区
【发布时间】:2020-06-03 17:46:02
【问题描述】:

我正在寻找正确的格式,以使用 partition.class FieldPartitioner 应用于 Kafka S3 Sink 连接器的 s3-sink.properties。 我需要根据我的 kafka 消息中的字段名称创建 3 个分区(另一个分区的一个子分区,例如 field1=value/field2=value/field3=value) .

我真的需要一个例子来说明 partition.field.name 的用法。它收到一个列表,但我还没有找到示例。

这是我目前拥有的,也是我需要的:

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
schema.compatibility=NONE
partitioner.class=io.confluent.connect.storage.partitioner.FieldPartitioner
partition.field.name=fieldName #Wrong so far, it can't even find my first one with this format. And I need two more, like: partition.field.name=field1,field2,field3

谢谢。

【问题讨论】:

    标签: amazon-s3 apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    它接收一个列表

    这是一个 顶级 字段名称的 CSV 列表(就像 bootstrap.servers)。

    并且您的数据必须是结构体(Avro 或 JSON,带有 schema.enabled=true

    您可以在单元测试中找到示例 - https://github.com/confluentinc/kafka-connect-storage-common/blob/v5.3.2/partitioner/src/test/java/io/confluent/connect/storage/partitioner/FieldPartitionerTest.java#L140

    【讨论】:

    • 之前提到,connect-distributed 是首选,用于运行 Kafka Connect,因此您将 使用 JSON 而不是属性文件来配置连接器
    猜你喜欢
    • 2021-02-02
    • 2021-03-16
    • 1970-01-01
    • 2020-12-11
    • 2022-07-12
    • 2021-07-15
    • 2017-09-13
    • 2021-10-01
    • 2021-08-07
    相关资源
    最近更新 更多