【问题标题】:Failed to connect to Confluent Platform Schema Registry - Apache Flink SQL Confluent Avro Format无法连接到 Confluent Platform Schema Registry - Apache Flink SQL Confluent Avro 格式
【发布时间】:2021-05-11 18:39:49
【问题描述】:

我正在使用 Confluent 托管的 Kafka 集群、Schema Registry 服务并尝试在 Flink 作业中处理 Debezium 消息。该作业配置为使用 Table & SQL 连接器和 Confluent Avro 格式。 但是该作业无法连接到 Schema Registry 并引发 401 错误。

表格连接器配置

tEnv.executeSql("CREATE TABLE flink_test_1 (\n" +
        "    ORDER_ID STRING,\n" +
        "    ORDER_TYPE STRING,\n" +
        "    USER_ID STRING,\n" +
        "    ORDER_SUM BIGINT\n" +
        ") WITH (\n" +
        "    'connector' = 'kafka',\n" +
        "    'topic'     = 'flink_test_1',\n" +
        "    'scan.startup.mode' = 'earliest-offset',\n" +
        "    'format' = 'avro-confluent',\n" + 
        "    'avro-confluent.schema-registry.url' = 'https://<SR_ENDPOINT>',\n" +
        "    'avro-confluent.schema-registry.subject' = 'flink_test_1-value',\n" +
        "    'properties.basic.auth.credentials.source' = 'USER_INFO',\n" +
        "    'properties.basic.auth.user.info' = '<SR_API_KEY>:<SR_API_SECRET>',\n" +
        "    'properties.bootstrap.servers' = '<CLOUD_BOOTSTRAP_SERVER_ENDPOINT>:9092',\n" +
        "    'properties.security.protocol' = 'SASL_SSL',\n" +
        "    'properties.ssl.endpoint.identification.algorithm' = 'https',\n" +
        "    'properties.sasl.mechanism' = 'PLAIN',\n" +
        "    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";'\n" +
        ")"); 

错误信息

Caused by: java.io.IOException: Failed to deserialize Avro record.
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
    at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.io.IOException: Could not find schema with id 100256 in registry
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
    at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
    ... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
    at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
    ... 11 more

我通过以下方式成功测试了与 Schema Registry 的连接:

curl -u <SR_API_KEY>:<SR_API_SECRET> https://<SR_ENDPOINT>

似乎错误消息“io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401”清楚地表明 : 没有传递给 Confluent Schema注册表。

我在这里查看了https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html 的文档,其中仅描述了 3 个格式选项:["format"、"avro-confluent.schema-registry.url"、"avro-confluent.schema-registry.subject"] 并且没有用于指定 SR_API_KEY 和 SR_API_SECRET 的选项。

我不知道如何从 Flink 程序成功连接到安全模式注册表。 Flink 是否支持这种连接类型? 有谁知道正确的连接配置应该是什么样子的?

谢谢。

【问题讨论】:

  • 如果可以传递其他属性,您可能会从向 Flink 开发人员打开问题或搜索源代码中获得更直接的答案
  • 不太确定,如果这可能有帮助(因为这对 Kafka Connect 有效,对 Flink Table API 无效,但我觉得它可能在某种程度上是一致的):将您的凭据放入此属性 schema.registry.basic.auth.user.info与冒号格式相同
  • 这里有同样的问题。这段时间你想清楚了吗?

标签: apache-kafka apache-flink confluent-platform confluent-schema-registry flink-sql


【解决方案1】:

我遇到了同样的问题。 经过一番调查,我发现了一个关于这个问题的Jira ticket。 如果无法升级你的 flink 版本,可以先使用 DataStream API 消费数据,然后将其转换为表格。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-10-06
    • 2021-10-12
    • 2019-07-21
    • 2019-06-23
    • 2018-08-02
    • 1970-01-01
    • 1970-01-01
    • 2019-05-01
    相关资源
    最近更新 更多