【问题标题】:java client example for Kafka Producer, send method not accepting KeyedMessageKafka Producer 的 java 客户端示例,发送方法不接受 KeyedMessage
【发布时间】:2015-05-26 07:40:44
【问题描述】:

我正在运行 kafka 2.9.1-0.8.2.1。我在 kafka 主目录中的 libs/ 目录中包含了 jars。现在我正在尝试按照https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example 此处给出的内容运行一个 java 生产者示例。现在producer.send 方法似乎正在接受这种论点Seq<KeyedMessage<String, String>>。在这个例子中,KeyedMessage 的对象没有被转换成任何东西。当我尝试做同样的事情时,我得到不兼容的类型编译器错误。

这是代码

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.producer.Producer;
import scala.collection.Seq;

public class KakfaProducer {
  public static void main(String [] args) {
    Properties prop = new Properties();
    prop.put("metadata.broker.list", "localhost:9092");
    prop.put("serializer.class","kafka.serializer.StringEncoder");
    //prop.put("partitioner.class", "example.producer.SimplePartitioner");
    ProducerConfig producerConfig = new ProducerConfig(prop);
    Producer<String,String> producer = new <String,String>Producer(producerConfig);
    String topic = "test";
    KeyedMessage<String,String> message = new <String,String>KeyedMessage(topic, "Hello Test message");
    producer.send(message);
    producer.close();
  }
}

并且该注释代码给了我 class def not found 异常。我试着在网上看了很多,但没有帮助。

该 libs/ 目录中有两种 jar。一个是 kafka-client,另一个是 kafka 和版本号。我是否包括错误的罐子?我需要与哪一个合作?

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    对于第一个问题,不要导入 scala API,而是导入 Java 一个。 所以,不要使用:

    import kafka.producer.Producer;
    

    请使用:

    import kafka.javaapi.producer.Producer;
    

    SimplePartitioner 代码可以在下面找到。添加到对应目录:

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
    
    public class SimplePartitioner implements Partitioner {
        public SimplePartitioner (VerifiableProperties props) {
        }
    
        public int partition(Object key, int numPartitions) {
            int partition = 0;
            String stringKey = (String) key;
            int offset = stringKey.lastIndexOf('.');
            if (offset > 0) {
               partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
            }
           return partition;
      }
    }
    

    【讨论】:

    • 谢谢。然而,这解决了我的第二个问题。你能帮我解决我的第一个问题,producer.send() 的问题吗?
    • 用第一个问题的解决方案修改了答案。
    • 太棒了!非常感谢。顺便说一句,你能告诉我 kafka-client.jar 和普通的 kafka.jar 有什么区别吗?
    • 推荐的制作人不是org.apache.kafka.clients.producer.KafkaProducer&lt;K,V&gt;,也不是你这里提到的吗?我对此完全陌生,所以我可能是错的。
    【解决方案2】:

    在 Java 中运行生产者有两种方式。

    1) 使用核心 Kafka。它是你的方法。 2) 使用 Kafka 客户端。

    Kafka 0.8.2 Documentation中提到了这些之间的区别。

    这些新客户端旨在取代现有的 Scala 客户端,但为了兼容性,它们将共存一段时间。这些客户端可以在一个独立的 jar 中使用,具有最小的依赖关系,而旧的 Scala 客户端仍然与服务器打包在一起。

    这意味着新的客户端更小,可以替代原来的方法。

    在第 3.4 节中。新的 Producer 提供者一些不同的配置。

    我们正在努力替换我们现有的制作人。该代码现在可以在主干中使用,可以认为是 beta 质量。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-12-11
      • 1970-01-01
      • 2019-02-14
      • 1970-01-01
      • 2013-07-22
      • 1970-01-01
      • 1970-01-01
      • 2021-11-09
      相关资源
      最近更新 更多