【发布时间】:2021-10-06 04:45:42
【问题描述】:
我正在使用 Protobuf 生成主题事件。我可以使用 Parquet 格式的 S3 Sink 连接器成功地将我的主题事件接收到 S3 存储桶中。现在我的 S3 存储桶中有 .parquet 和 .key.parquet 类型的对象。使用以下配置,所有这些都按预期工作:
{
"name": "s3-sink",
"config": {
"name": "s3-sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"keys.format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "https://my-schema-registry",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "MY_SR_API_KEY:MY_SR_API_SECRET",
"store.kafka.keys": true,
"parquet.codec": "none",
"partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
"locale": "en-US",
"s3.bucket.name": "my-bucket-123",
"s3.region": "eu-west-1",
"time.interval": "HOURLY",
"flush.size": 1000,
"tasks.max": 1,
"topics.regex": "test-topic.*",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https"
}
}
}
现在我想使用 Protobuf 将 my-bucket-123(parquet 格式)的键和值放回 Kafka 主题中。为此,我使用以下配置通过 Confluent 设置了一个新的 S3 源连接器 (confluentinc/kafka-connect-s3-source:1.4.5):
{
"name": "s3-source",
"config": {
"name": "s3-source",
"dest.kafka.bootstrap.servers": "my-bootstrap-server",
"dest.topic.replication.factor": 1,
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"tasks.max": 1,
"connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "my-bootstrap-server",
"confluent.topic.replication.factor": 3,
"confluent.license.topic.replication.factor": 1,
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"MY_API_KEY\" password=\"MY_API_SECRET\";",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": ".*",
"transforms.AddPrefix.replacement": "copy_of_$0",
"s3.region": "eu-west-1",
"s3.bucket.name": "my-bucket-123"
}
}
通过使用上述配置,我无法启动我的 S3 源连接器。如果我使用上述配置和命令验证配置:
curl -X PUT -d @config.json --header "content-Type:application/json" http://localhost:8083/connector-plugins/S3SourceConnector/config/validate
我在format.class 属性中收到以下错误:
"errors":[
"Invalid value io.confluent.connect.s3.format.parquet.ParquetFormat for configuration format.class: Class io.confluent.connect.s3.format.parquet.ParquetFormat could not be found.",
"Invalid value null for configuration format.class: Class must extend: io.confluent.connect.cloud.storage.source.StorageObjectFormat"
]
我开始认为这个 S3 源连接器不支持 Parquet 格式。我尝试针对 JSON、AVRO 和 BYTE 格式对其进行验证,所有这些都可以。
深入研究 S3 源连接器 jar 文件 (1.4.5),我没有找到 Parquet 格式的文件:
有人知道这里发生了什么吗?有没有其他方法可以将数据从 S3 - Parquet 格式放回我的 Kafka 集群?
谢谢!
【问题讨论】:
-
S3 连接器主要用于分析,而不是备份/恢复。话虽如此,Parquet 是一种列格式,因此无法轻松读取单个记录,例如基于行的 Avro/JSON/text-lines 可以
标签: amazon-s3 apache-kafka parquet apache-kafka-connect confluent-platform