【发布时间】:2019-08-06 23:21:13
【问题描述】:
我没有看到如何使用 camel-avro 组件生成和使用 kafka avro 消息的示例?目前我的骆驼路线是这样的。为了与 schema-registry 和其他类似使用 camel-kafka-avro 消费者和生产者的道具一起工作,应该进行哪些更改。
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");
log.info("About to start route: Kafka Server -> Log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}"
+"&valueDeserializer="+KafkaAvroDeserializer.class
+"&keyDeserializer="+StringDeserializer.class
)
.routeId("FromKafka")
.log("${body}");
【问题讨论】:
-
为什么会被否决?他们能解释一下吗?即使使用 thsi 配置 -> 它也不起作用 value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer.
-
你能澄清一下发生了什么吗?
-
@cricket_007 :应用程序启动并设置 kafka-consumer 看看这个值,AvroDeserializer 是我自己的类,我无法从 confluent 的 avro 消费者建立 kafka 连接。 -> value.deserializer = class org.apache.camel.example.kafka.AvroDeserializer 2019-03-18 07:56:40,663 [nsumer[avro-t1]]警告 KafkaConsumer - KafkaException 正在消耗来自主题 avro-t1 的 avro-t1-Thread 0。将在下次运行时尝试重新连接 -----这是例外情况,应用程序不断重新连接到 kafka 代理,因此失败。
-
嗯。我不承认这个
org.apache.camel.example.kafka.AvroDeserializer。理论上,类路径上的任何反序列化器都应该工作。您能否启用调试日志记录? -
@cricket_007:感谢您在思考过程中帮助我。
标签: java apache-kafka apache-camel avro confluent-schema-registry