【问题标题】:Failed to deserialize Confluent Avro record using Flink-SQL使用 Flink-SQL 反序列化 Confluent Avro 记录失败
【发布时间】:2021-10-23 10:51:17
【问题描述】:

无法使用 Flink-sql 消费 Confluent Kafka 数据。

Fink版本:1.12-csadh1.3.0.0 集群:Cloudera(CDP) 卡夫卡:融合卡夫卡

SQL:

CREATE TABLE consumer_session_created (
***
) WITH (
  'connector' = 'kafka',
  'topic' = '***', 
  'scan.startup.mode' = 'earliest-offset', 
  'properties.bootstrap.servers' = '***:9092', 
  'properties.group.id' = '***', 
  'properties.security.protocol' = 'SASL_SSL', 
  'properties.sasl.mechanism' = 'PLAIN', 
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="***" password="***"', 
  'properties.avro-confluent.basic-auth.credentials-source' = '***', 
  'properties.avro-confluent.basic-auth.user-info' = '***', 
  'value.format' = 'avro-confluent', 
  'value.fields-include' = 'EXCEPT_KEY', 
  'value.avro-confluent.schema-registry.url' = 'https://***', 
  'value.avro-confluent.schema-registry.subject' = '***'
)

错误信息:

java.io.IOException: Failed to deserialize Avro record.
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.io.IOException: Could not find schema with id 79 in registry
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
    ... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
    at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
    ... 11 more

根据flink Doc here,我以为我使用了错误的参数avro-confluent.basic-auth.*。所以我去掉了前缀properties.:

WITH (
  'connector',
  *** 
  'avro-confluent.basic-auth.credentials-source' = '***', 
  'avro-confluent.basic-auth.user-info' = '***', 
  ***
)

但是,引发了另一个异常:

org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'.
Unsupported options:
avro-confluent.basic-auth.credentials-source 
avro-confluent.basic-auth.user-info

提示: 我们可以使用具有相同参数的 DataStream API 正确地消费/反序列化 kafka 数据,并且这个主题已经被其他人使用了很长时间。

【问题讨论】:

  • 这是某些特定 flink 版本的已知错误。无法将 ssl 配置传递给 Confluent 架构注册表格式。此问题已在版本 1.14.0、1.12.5、1.13.2 中修复。 Jira 票在这里[issues.apache.org/jira/browse/FLINK-21229].

标签: apache-kafka apache-flink confluent-schema-registry flink-sql


【解决方案1】:

我不确定 Flink 是否包含在 Cloudera 分布式中,但在简单(原始)Flink 中,我使用 SQL 连接到 Kafka:

CREATE TABLE my_flink_table (
     event_date AS TO_TIMESTAMP(eventtime_string_field, 'yyyyMMddHHmmssX') 
    ,field1
    ,field2
    ...
    ,WATERMARK FOR event_date AS event_date - INTERVAL '10' MINUTE
) WITH (
    'connector' = 'kafka',
    'topic' = 'my_events_topic',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'avro-confluent',
    'avro-confluent.schema-registry.url' = 'http://my_confluent_schma_reg_host:8081/',
    'properties.group.id' = 'flink-test-001',
    'properties.bootstrap.servers' = 'kafka_host01:9092'
);

【讨论】:

    猜你喜欢
    • 2021-05-09
    • 2019-07-30
    • 1970-01-01
    • 2021-11-30
    • 2019-11-18
    • 2021-12-01
    • 1970-01-01
    • 2021-05-11
    • 2021-07-01
    相关资源
    最近更新 更多