【发布时间】:2017-10-05 09:05:43
【问题描述】:
我开发了一个 java 应用程序,它使用 Schema Registry 从 avro 主题读取数据,然后进行简单的转换并在控制台中打印结果。默认情况下,我使用 GenericAvroSerde 类作为键和值。一切都很好,除了我必须为每个 serde 定义额外的配置,比如
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url"));
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true);
valueGenericAvroSerde.configure(serdeConfig, false);
否则我总是会收到如下错误:
Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
嗯,这很不寻常,但是很好,在那之后(当我添加了上面发布的配置调用时)-它起作用了,我的应用程序能够执行所有操作并打印出结果。
但是! 当我尝试使用 call through() - 只是将数据发布到新主题时 - 我遇到了我要问的问题:主题是在没有架构的情况下创建的。 怎么可能???
有趣的事实是正在写入数据,但它是: a)二进制格式,所以简单的消费者无法读取它 b) 它没有架构 - 所以 avro 消费者也无法读取它:
Processed a total of 1 messages
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
当然,我检查了该主题的架构注册表:
curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions
{"error_code":40401,"message":"Subject not found."}
但是对 Java App 编写的另一个主题的相同调用 - 初始数据的生产者表明架构存在:
curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions
[1]
两个应用程序使用相同的“schema.registry.url”配置 总结一下——创建了主题,写入了数据,可以用简单的消费者读取,但它是二进制的,架构不存在。
我还尝试使用 Landoop 创建架构,以某种方式匹配数据,但没有成功 - 顺便说一下,这不是使用 kafka 流的正确方法 - 一切都应该即时完成。
请帮忙!
【问题讨论】:
-
您使用哪个版本?另外,您将
StreamsConfig中的 AvroSerde 设置为默认值,还是单独设置每个运算符?您是否在启动应用程序之前手动创建了使用的主题?另请查看此示例:github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/… -
我使用 Confluent 3.3.0、java 1.8、kafka 0.11.0.0-cp1、avro 版本 1.7.7。我将 GenericAvroSerde 设置为默认值,但对于简单类型,我会逐个覆盖这些设置(Serdes.Long、Serdes.String、Serdes.Float)。我打算使用的主题不存在,但它是在我在开始帖子中写入数据时创建的。
-
关于这个例子 - 我做的都是一样的,除了在例子中键被解码为一个字节数组,而我使用 stringSerde (因为键的 avro 模式只是“字符串”)。 Schema regstry url 可用,如果不是,我将无法读取 kafka 流应用程序中的初始数据。但是当我执行 finalStream.print() 时,我能够正确打印最终流
-
如果您在 StreamsConfig 中将 avro 设置为默认值,我想知道为什么您需要调用
configure(即,您问题中的整个第一个代码块)。 Streams 将自动配置您的 Serde——如果没有,您的设置似乎不正确。您可以分享您的 StreamsConfig 代码吗?你看过我分享的例子吗?
标签: java apache-kafka avro apache-kafka-streams confluent-platform