【发布时间】:2021-05-11 18:39:49
【问题描述】:
我正在使用 Confluent 托管的 Kafka 集群、Schema Registry 服务并尝试在 Flink 作业中处理 Debezium 消息。该作业配置为使用 Table & SQL 连接器和 Confluent Avro 格式。 但是该作业无法连接到 Schema Registry 并引发 401 错误。
表格连接器配置
tEnv.executeSql("CREATE TABLE flink_test_1 (\n" +
" ORDER_ID STRING,\n" +
" ORDER_TYPE STRING,\n" +
" USER_ID STRING,\n" +
" ORDER_SUM BIGINT\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'flink_test_1',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'avro-confluent',\n" +
" 'avro-confluent.schema-registry.url' = 'https://<SR_ENDPOINT>',\n" +
" 'avro-confluent.schema-registry.subject' = 'flink_test_1-value',\n" +
" 'properties.basic.auth.credentials.source' = 'USER_INFO',\n" +
" 'properties.basic.auth.user.info' = '<SR_API_KEY>:<SR_API_SECRET>',\n" +
" 'properties.bootstrap.servers' = '<CLOUD_BOOTSTRAP_SERVER_ENDPOINT>:9092',\n" +
" 'properties.security.protocol' = 'SASL_SSL',\n" +
" 'properties.ssl.endpoint.identification.algorithm' = 'https',\n" +
" 'properties.sasl.mechanism' = 'PLAIN',\n" +
" 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";'\n" +
")");
错误信息
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.io.IOException: Could not find schema with id 100256 in registry
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
... 9 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
... 11 more
我通过以下方式成功测试了与 Schema Registry 的连接:
curl -u <SR_API_KEY>:<SR_API_SECRET> https://<SR_ENDPOINT>
似乎错误消息“io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401”清楚地表明
我在这里查看了https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html 的文档,其中仅描述了 3 个格式选项:["format"、"avro-confluent.schema-registry.url"、"avro-confluent.schema-registry.subject"] 并且没有用于指定 SR_API_KEY 和 SR_API_SECRET 的选项。
我不知道如何从 Flink 程序成功连接到安全模式注册表。 Flink 是否支持这种连接类型? 有谁知道正确的连接配置应该是什么样子的?
谢谢。
【问题讨论】:
-
如果可以传递其他属性,您可能会从向 Flink 开发人员打开问题或搜索源代码中获得更直接的答案
-
不太确定,如果这可能有帮助(因为这对 Kafka Connect 有效,对 Flink Table API 无效,但我觉得它可能在某种程度上是一致的):将您的凭据放入此属性
schema.registry.basic.auth.user.info与冒号格式相同 -
这里有同样的问题。这段时间你想清楚了吗?
标签: apache-kafka apache-flink confluent-platform confluent-schema-registry flink-sql