【问题标题】:How to use camel-avro-consumer & producer?如何使用camel-avro-consumer & producer?
【发布时间】:2019-08-06 23:21:13
【问题描述】:

我没有看到如何使用 camel-avro 组件生成和使用 kafka avro 消息的示例?目前我的骆驼路线是这样的。为了与 schema-registry 和其他类似使用 camel-kafka-avro 消费者和生产者的道具一起工作,应该进行哪些更改。

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);              
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

public void configure() {
        PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); 
        pc.setLocation("classpath:application.properties");

        log.info("About to start route: Kafka Server -> Log ");

        from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                + "&maxPollRecords={{consumer.maxPollRecords}}"
                + "&consumersCount={{consumer.consumersCount}}"
                + "&seekTo={{consumer.seekTo}}"
                + "&groupId={{consumer.group}}"
                +"&valueDeserializer="+KafkaAvroDeserializer.class
                +"&keyDeserializer="+StringDeserializer.class
                )
                .routeId("FromKafka")
            .log("${body}");

【问题讨论】:

  • 为什么会被否决?他们能解释一下吗?即使使用 thsi 配置 -> 它也不起作用 value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer.
  • 你能澄清一下发生了什么吗?
  • @cricket_007 :应用程序启动并设置 kafka-consumer 看看这个值,AvroDeserializer 是我自己的类,我无法从 confluent 的 avro 消费者建立 kafka 连接。 -> value.deserializer = class org.apache.camel.example.kafka.AvroDeserializer 2019-03-18 07:56:40,663 [nsumer[avro-t1]]警告 KafkaConsumer - KafkaException 正在消耗来自主题 avro-t1 的 avro-t1-Thread 0。将在下次运行时尝试重新连接 -----这是例外情况,应用程序不断重新连接到 kafka 代理,因此失败。
  • 嗯。我不承认这个org.apache.camel.example.kafka.AvroDeserializer。理论上,类路径上的任何反序列化器都应该工作。您能否启用调试日志记录?
  • @cricket_007:感谢您在思考过程中帮助我。

标签: java apache-kafka apache-camel avro confluent-schema-registry


【解决方案1】:

我正在回答我自己的问题,因为我在这个问题上坐了几天。我希望这个答案对其他人有帮助。

我尝试使用 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化器并得到 kafka 异常。所以我不得不编写自己的反序列化器来做以下事情:

  1. 设置架构注册表
  2. 使用特定的avro阅读器(这意味着不是默认的stringDeserializer)

然后我们必须访问“schemaRegistry”、“useSpecificAvroReader”并设置 AbstractKafkaAvroDeserializer(io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer) 的这些字段

这里是解决方案...

CAMEL-KAFKA-AVRO-ROUTE-BUILDER

public static void main(String[] args) throws Exception {
    LOG.info("About to run Kafka-camel integration...");
    CamelContext camelContext = new DefaultCamelContext();
    // Add route to send messages to Kafka
    camelContext.addRoutes(new RouteBuilder() {
        public void configure() throws Exception {                
            PropertiesComponent pc = getContext().getComponent("properties", 
                                      PropertiesComponent.class);
            pc.setLocation("classpath:application.properties");

            log.info("About to start route: Kafka Server -> Log ");

            from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}"
                    + "&keyDeserializer="+ StringDeserializer.class.getName() 
                    + "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName()
                    )
                    .routeId("FromKafka")
                .log("${body}");

        }
    });
    camelContext.start();
    // let it run for 5 minutes before shutting down
    Thread.sleep(5 * 60 * 1000);
    camelContext.stop();
}

DESERIALIZER CLASSS - 这将 schema.registry.url 和 use.specific.avro.reader 设置在抽象 AbstractKafkaAvroDeserializer 级别。如果我不设置这个,我会得到 kafka-config-exception。

package com.example.camel.kafka.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;


import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;



public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
    implements Deserializer<Object> {

    private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";

    @Override
    public void configure(KafkaAvroDeserializerConfig config) {

     try {
          final List<String> schemas = 
                              Collections.singletonList(SCHEMA_REGISTRY_URL);
          this.schemaRegistry = new CachedSchemaRegistryClient(schemas, 
                                  Integer.MAX_VALUE);
          this.useSpecificAvroReader = true;

       } catch (ConfigException e) {
              throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
     }
   }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    configure(null);
  }

  @Override
  public Object deserialize(String s, byte[] bytes) {
    return deserialize(bytes);
  }

  @Override
  public void close() {
  }
}

【讨论】:

【解决方案2】:

使用camel-kafka-starter(用于spring boot)version: 3.6.0,您不需要定义CustomKafkaAvroDeserializer。相反,将以下配置详细信息添加到您的 application.yamlapplication.properties 文件中,camel-kafka 组件(生产者和消费者)会将适当的 SerDe 应用于正在处理的对象/字节。

camel:
  springboot:
    main-run-controller: true # to keep the JVM running
  component:
    kafka:
      brokers: http://localhost:9092
      schema-registry-u-r-l: http://localhost:8081

      #Consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      #Producer
      key-serializer-class: org.apache.kafka.common.serialization.StringSerializer
      serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer

      specific-avro-reader: true

您还需要确保在运行生产者/消费者之前将各自的 avro-schema-json 文件上传到您的架构注册表服务器,例如 confluent-schema-registry。

【讨论】:

  • 能否分享一个对应的示例路线?
【解决方案3】:

我在同一个问题上挣扎了一段时间。我用 confluent 的 camel-quarkus 和 schema-registry 做了完整的例子: https://github.com/tstuber/camel-quarkus-kafka-schema-registry

【讨论】:

    猜你喜欢
    • 2019-04-04
    • 2018-03-23
    • 1970-01-01
    • 1970-01-01
    • 2017-05-02
    • 1970-01-01
    • 1970-01-01
    • 2019-04-20
    • 1970-01-01
    相关资源
    最近更新 更多