【发布时间】:2021-06-18 10:15:29
【问题描述】:
我在 Spark Streaming 中使用 Kafka Source 来接收使用 Confluent Cloud 中的 Datagen 生成的记录。我打算使用 Confluent Schema Registry,
目前,这是我面临的例外: *
线程“main”中的异常 io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 未经授权;错误代码:401
confluent cloud的schema registry需要传递一些我不知道如何输入的认证数据:
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret
我认为我必须将此身份验证数据传递给 CachedSchemaRegistryClient,但我不确定是否如此以及如何。
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
如果我尝试将身份验证发送到架构注册表
val restService = new RestService(schemaRegistryURL)
val props = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> "secret:secret"
).asJava
var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
我明白了
Cannot resolve overloaded constructor CachedSchemaRegistryClient,似乎只有2个参数要发送到CachedSchemaRegistryClient。
我该如何解决这个问题?
我遇到了这个post,但在这里他们没有对融合云中的架构注册表应用任何身份验证。
【问题讨论】:
-
不清楚你用的是什么版本的Registry客户端……你肯定可以传三个参数-github.com/confluentinc/schema-registry/blob/master/client/src/…
-
@OneCricketeer 这是目前使用的 kafka-schema-registry-client-3.3.1.jar ..在这种情况下应该使用哪个版本的任何建议?
-
@OneCricketeer 感谢您的意见。我现在能够以融合的方式连接到我的模式注册表。使用 spark readStream 我可以从 kafka 主题中读取并查看架构。但是在执行 from_avro(value) 并在控制台上尝试 writeStream 时,我得到
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1..??任何输入或参考都会有所帮助。 -
@OneCricketeer 这段代码对我有用:
private val schemaRegistryUrl = "<schemaregistryURL>" val props = Map( "basic.auth.credentials.source" -> "USER_INFO", "schema.registry.basic.auth.user.info" -> "<api-key>:<api-secret>" ).asJava private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props)我们需要确保在转换为 JAVA 时进行正确的导入:import scala.collection.JavaConverters.mapAsJavaMapConverter
标签: scala apache-spark apache-kafka confluent-schema-registry confluent-cloud