【问题标题】:Unable to send GenericRecord data from Kafka Producer in AVRO format无法从 Kafka Producer 以 AVRO 格式发送 GenericRecord 数据
【发布时间】:2019-01-17 08:05:54
【问题描述】:

使用 confluent-oss-5.0.0-2.11 我的 Kafka Producer 代码是

public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        //props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
        Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
        GenericRecord record = new GenericData.Record(schema);
        record.put("uID", "06080000");
        record.put("userName", "User data10");
        record.put("company", "User data10");
        record.put("age", 12);
        record.put("location", "User data10");

        ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
        producer.send(recordData);

        System.out.println("Message Sent");
    }

}

似乎 Producer 代码没问题,并且能够在控制台上看到 Message Sent

Kafka Consumer代码为:

public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("ZOOKEEPER_HOST", "localhost");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("group.id", "consumer1");
    props.put("auto.offset.reset", "latest");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("schema.registry.url", "http://localhost:8081");
    String topic = "confluent-new";

    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
    consumer.subscribe(Arrays.asList(topic));
    while(true){
        ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
        for (ConsumerRecord<String, GenericRecord> rec : recs) {
            System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
        }
    }
}

}

我无法在 Kafka 消费者端看到消息(数据)。我还检查了 confluent_new 主题的偏移量/状态,它没有更新。好像生产者代码有问题。 任何指针都会有所帮助。

同时下面的生产者代码正在工作,这里 POJO 即 User 是 avro-tools 生成的 POJO。

public class AvroProducer {
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties props = new Properties();
        kafkaParams.put("auto.offset.reset", "smallest");
        kafkaParams.put("ZOOKEEPER_HOST", "bihdp01");*/
        props.put("bootstrap.servers", "localhost:9092");
        props.put("ZOOKEEPER_HOST", "localhost");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081");
        String topic = "confluent-new";

        Producer<String, User> producer = new KafkaProducer<String, User>(props);
        User user = new User();
        user.setUID("0908");
        user.setUserName("User data10");
        user.setCompany("HCL");
        user.setAge(20);
        user.setLocation("Noida");
        ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, (String) user.getUID(), user);
        producer.send(record).get();
        System.out.println("Sent");
    }

}

P.S. 我的要求是以 AVRO 格式将接收到的 JSON 数据从源 KAFKA 主题发送到目标 KAFKA 主题。首先,我使用 AVRO4S 从收到的 JSON 数据中推断出 AVRO 模式,并将模式注册到 SCHEMA REGISTRY。接下来是从接收到的 JSON 中提取数据并填充到 GenericRecord 实例中,然后使用 KafkaAvroSerializer 将此 GenericRecord 实例发送到 Kafka 主题。在消费者端,我将使用 KafkaAvroDeserializer 对接收到的 AVRO 数据进行反序列化。

【问题讨论】:

  • kafkaParams.put("auto.offset.reset", "smallest"); 应该是“最早的”。
  • Producer 会在本地缓冲一小段时间。您应该在您的应用终止之前producer.close() 以确保数据已发送。
  • 正是@Hitobat 我也想通了。非常感谢朋友。
  • ZOOKEEPER_HOST 不是必需的,也不是有效的 Kafka 属性
  • @cricket_007 是的。我现在已经清理了代码。非常感谢。

标签: java apache-kafka avro kafka-producer-api confluent-schema-registry


【解决方案1】:

请尝试在第一个 Producer 中添加 get()

producer.send(recordData).get();

【讨论】:

    【解决方案2】:

    在寻找解决方案的过程中,我尝试了 Thread.sleep(1000),它解决了我的问题。我也尝试了 producer.send(record).get() ,这也解决了问题。经过Documentation之后,我遇到了下面的代码sn-p,它提示了解决方案。

    // When you're finished producing records, you can 
       flush the producer to ensure it has all been `written` to Kafka and
       // then close the producer to free its resources.
    
    finally {
      producer.flush();
      producer.close();
      }
    

    这是解决此问题的最佳方法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-25
      • 2019-04-04
      • 1970-01-01
      • 2021-11-09
      • 1970-01-01
      • 1970-01-01
      • 2016-11-16
      • 2018-03-23
      相关资源
      最近更新 更多