【问题标题】:Configuring Spark Structured Streaming with authenticated Confluent Schema Registry使用经过身份验证的 Confluent Schema Registry 配置 Spark 结构化流
【发布时间】:2021-06-18 10:15:29
【问题描述】:

我在 Spark Streaming 中使用 Kafka Source 来接收使用 Confluent Cloud 中的 Datagen 生成的记录。我打算使用 Confluent Schema Registry,

目前,这是我面临的例外: *

线程“main”中的异常 io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 未经授权;错误代码:401

confluent cloud的schema registry需要传递一些我不知道如何输入的认证数据:

basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=secret: secret

我认为我必须将此身份验证数据传递给 CachedSchemaRegistryClient,但我不确定是否如此以及如何。

// Setup the Avro deserialization UDF
   schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)

如果我尝试将身份验证发送到架构注册表

val restService = new RestService(schemaRegistryURL)

  val props = Map(
    "basic.auth.credentials.source" -> "USER_INFO",
    "schema.registry.basic.auth.user.info" -> "secret:secret"
  ).asJava

  var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)

我明白了 Cannot resolve overloaded constructor CachedSchemaRegistryClient,似乎只有2个参数要发送到CachedSchemaRegistryClient。

我该如何解决这个问题?

我遇到了这个post,但在这里他们没有对融合云中的架构注册表应用任何身份验证。

【问题讨论】:

  • 不清楚你用的是什么版本的Registry客户端……你肯定可以传三个参数-github.com/confluentinc/schema-registry/blob/master/client/src/…
  • @OneCricketeer 这是目前使用的 kafka-schema-registry-client-3.3.1.jar ..在这种情况下应该使用哪个版本的任何建议?
  • @OneCricketeer 感谢您的意见。我现在能够以融合的方式连接到我的模式注册表。使用 spark readStream 我可以从 kafka 主题中读取并查看架构。但是在执行 from_avro(value) 并在控制台上尝试 writeStream 时,我得到 Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1 ..??任何输入或参考都会有所帮助。
  • @OneCricketeer 这段代码对我有用:private val schemaRegistryUrl = "<schemaregistryURL>" val props = Map( "basic.auth.credentials.source" -> "USER_INFO", "schema.registry.basic.auth.user.info" -> "<api-key>:<api-secret>" ).asJava private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props) 我们需要确保在转换为 JAVA 时进行正确的导入:import scala.collection.JavaConverters.mapAsJavaMapConverter

标签: scala apache-spark apache-kafka confluent-schema-registry confluent-cloud


【解决方案1】:

这段代码对我有用:

private val schemaRegistryUrl = "<schemaregistryURL>"   
val props = Map("basic.auth.credentials.source" -> "USER_INFO",
 "schema.registry.basic.auth.user.info" -> "<api-key>:<api-secret>").asJava

 private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100,props)

我们需要确保在转换为 JAVA 时进行正确的导入:

 import scala.collection.JavaConverters.mapAsJavaMapConverter

【讨论】:

    猜你喜欢
    • 2018-07-30
    • 2020-03-09
    • 2020-08-12
    • 2020-06-12
    • 1970-01-01
    • 2021-10-04
    相关资源
    最近更新 更多