【问题标题】:Kafka Streams creates avro topic without schemaKafka Streams 创建没有模式的 avro 主题
【发布时间】:2017-10-05 09:05:43
【问题描述】:

我开发了一个 java 应用程序,它使用 Schema Registry 从 avro 主题读取数据,然后进行简单的转换并在控制台中打印结果。默认情况下,我使用 GenericAvroSerde 类作为键和值。一切都很好,除了我必须为每个 serde 定义额外的配置,比如

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url"));
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
    keyGenericAvroSerde.configure(serdeConfig, true);
    valueGenericAvroSerde.configure(serdeConfig, false);

否则我总是会收到如下错误:

Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)

嗯,这很不寻常,但是很好,在那之后(当我添加了上面发布的配置调用时)-它起作用了,我的应用程序能够执行所有操作并打印出结果。

但是! 当我尝试使用 call through() - 只是将数据发布到新主题时 - 我遇到了我要问的问题:主题是在没有架构的情况下创建的。 怎么可能???

有趣的事实是正在写入数据,但它是: a)二进制格式,所以简单的消费者无法读取它 b) 它没有架构 - 所以 avro 消费者也无法读取它:

    Processed a total of 1 messages
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

当然,我检查了该主题的架构注册表:

curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions
{"error_code":40401,"message":"Subject not found."}

但是对 Java App 编写的另一个主题的相同调用 - 初始数据的生产者表明架构存在:

curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions
[1]

两个应用程序使用相同的“schema.registry.url”配置 总结一下——创建了主题,写入了数据,可以用简单的消费者读取,但它是二进制的,架构不存在。

我还尝试使用 Landoop 创建架构,以某种方式匹配数据,但没有成功 - 顺便说一下,这不是使用 kafka 流的正确方法 - 一切都应该即时完成。

请帮忙!

【问题讨论】:

  • 您使用哪个版本?另外,您将 StreamsConfig 中的 AvroSerde 设置为默认值,还是单独设置每个运算符?您是否在启动应用程序之前手动创建了使用的主题?另请查看此示例:github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/…
  • 我使用 Confluent 3.3.0、java 1.8、kafka 0.11.0.0-cp1、avro 版本 1.7.7。我将 GenericAvroSerde 设置为默认值,但对于简单类型,我会逐个覆盖这些设置(Serdes.Long、Serdes.String、Serdes.Float)。我打算使用的主题不存在,但它是在我在开始帖子中写入数据时创建的。
  • 关于这个例子 - 我做的都是一样的,除了在例子中键被解码为一个字节数组,而我使用 stringSerde (因为键的 avro 模式只是“字符串”)。 Schema regstry url 可用,如果不是,我将无法读取 kafka 流应用程序中的初始数据。但是当我执行 finalStream.print() 时,我能够正确打印最终流
  • 如果您在 StreamsConfig 中将 avro 设置为默认值,我想知道为什么您需要调用 configure(即,您问题中的整个第一个代码块)。 Streams 将自动配置您的 Serde——如果没有,您的设置似乎不正确。您可以分享您的 StreamsConfig 代码吗?你看过我分享的例子吗?

标签: java apache-kafka avro apache-kafka-streams confluent-platform


【解决方案1】:

当调用through 时,将使用通过StreamsConfig 定义的默认serde,除非用户明确覆盖它。您使用了哪个默认 serde?正确地说,您应该使用 AbstractKafkaAvroSerializer 它将通过主题自动注册该模式。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-27
    • 1970-01-01
    • 1970-01-01
    • 2018-10-27
    • 1970-01-01
    • 1970-01-01
    • 2020-01-08
    • 1970-01-01
    相关资源
    最近更新 更多