【发布时间】:2021-10-23 10:51:17
【问题描述】:
无法使用 Flink-sql 消费 Confluent Kafka 数据。
Fink版本:1.12-csadh1.3.0.0 集群:Cloudera(CDP) 卡夫卡:融合卡夫卡
SQL:
CREATE TABLE consumer_session_created (
***
) WITH (
'connector' = 'kafka',
'topic' = '***',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '***:9092',
'properties.group.id' = '***',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***"',
'properties.avro-confluent.basic-auth.credentials-source' = '***',
'properties.avro-confluent.basic-auth.user-info' = '***',
'value.format' = 'avro-confluent',
'value.fields-include' = 'EXCEPT_KEY',
'value.avro-confluent.schema-registry.url' = 'https://***',
'value.avro-confluent.schema-registry.subject' = '***'
)
错误信息:
java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.io.IOException: Could not find schema with id 79 in registry
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
... 11 more
根据flink Doc here,我以为我使用了错误的参数avro-confluent.basic-auth.*。所以我去掉了前缀properties.:
WITH (
'connector',
***
'avro-confluent.basic-auth.credentials-source' = '***',
'avro-confluent.basic-auth.user-info' = '***',
***
)
但是,引发了另一个异常:
org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'.
Unsupported options:
avro-confluent.basic-auth.credentials-source
avro-confluent.basic-auth.user-info
提示: 我们可以使用具有相同参数的 DataStream API 正确地消费/反序列化 kafka 数据,并且这个主题已经被其他人使用了很长时间。
【问题讨论】:
-
这是某些特定 flink 版本的已知错误。无法将 ssl 配置传递给 Confluent 架构注册表格式。此问题已在版本 1.14.0、1.12.5、1.13.2 中修复。 Jira 票在这里[issues.apache.org/jira/browse/FLINK-21229].
标签: apache-kafka apache-flink confluent-schema-registry flink-sql