【发布时间】:2018-02-11 10:54:17
【问题描述】:
我正在使用 Kafka 版本 0.11.0.0 并尝试通过从 avro 文件加载数据来创建输入流。但是它无法实例化 Producer 并出现异常:
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 0 ms.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:415)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
at wordcount.PayloadProducer.produceInputs(PayloadProducer.java:42)
at wordcount.PayloadProducer.main(PayloadProducer.java:24)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/core/JsonProcessingException
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:47)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:73)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:42)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
... 3 more
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.core.JsonProcessingException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 9 more
我正在关注此链接以完成我的工作:Confluent kafka stream example
实例化Producer的代码:
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
GenericRecordBuilder pageViewBuilder = new GenericRecordBuilder(loadSchema("payload.avsc"));
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
【问题讨论】:
标签: apache-kafka kafka-producer-api confluent-platform