【问题标题】:Kafka 2.9.2-0.8.1.1 no KeyedMessage parameter for producer.sendKafka 2.9.2-0.8.1.1 没有用于 producer.send 的 KeyedMessage 参数
【发布时间】:2016-11-09 05:18:24
【问题描述】:

我正在尝试用 java 向 Kafka 发送消息。我的项目是通过maven依赖使用kafka_2.9.2-0.8.1.1.jar:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.9.2</artifactId>
  <version>0.8.1.1</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
    </exclusion>
  </exclusions>
</dependency>

我遇到的问题是我的制片人。我想发送一个 KeyedMessage 并通过这里的文档:http://kafka.apache.org/documentation.html#producerapi 它声明 producer.send 方法应该采用 KeyedMessage 作为其参数。当我检查 producer.send 调用可用的选项时,它只允许 ProducerData 对象作为可接受的参数。

我的代码是这样设置的:

    private String topic;   
    private String key;
    private Properties props = new Properties();
    private Producer<String,String> producer;

    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("zk.connect", "127.0.0.1:2181");  

    //create the producer
    producer = new Producer<String, String>(new ProducerConfig(props));   

    String topic = "test";
    String key = "test_key";
    String message = "test_msg";

    KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, message);
    producer.send(data)  <---- ERROR is here, send method not allowed param of KeyedMessage

实际的错误是这样的:

 The method send(ProducerData<String,String>) in the type Producer<String,String> is not applicable for the arguments (KeyedMessage<String,String>)

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    我认为你需要定义

     props.put("partitioner.class", "example.producer.SimplePartitioner");
    

    wiki 页面显示

    第三个属性“partitioner.class”定义了使用哪个类来确定将消息发送到主题中的哪个分区。这是可选的,但对于任何重要的实现,您都需要实现分区方案。稍后将详细介绍此类的实现。如果您包含键的值但尚未定义分区器。类 Kafka 将使用默认分区器。如果 key 为 null,则 Producer 会将消息分配给随机 Partition。

    【讨论】:

      【解决方案2】:

      如果您不添加“partitioner.class”属性,生产者将遵循该特定主题的分区之间的循环分配。

       props.put("partitioner.class", "example.producer.SimplePartitioner");
      

      如果您不是特别需要基于键的分区选择,请省略“partitioner.class”属性并按如下方式定义 KeyedMessage,

          private String topic;   
          private String key;
          private Properties props = new Properties();
          private Producer<String,String> producer;
      
          props.put("metadata.broker.list", "localhost:9092");
          props.put("producers.type", "sync");
          props.put("serializer.class", "kafka.serializer.StringEncoder");
          props.put("request_final.required.acks", "1");
      
          //create the producer
          producer = new Producer<String, String>(new ProducerConfig(props));   
      
          topic = "test";
          message = "test_msg";
      
          KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, message);
          producer.send(data) 
      

      【讨论】:

        【解决方案3】:

        使用

        import kafka.javaapi.producer.Producer; 
        

        你可能用过

        import kafka.producer.Producer;
        

        【讨论】:

        • 请在正文中正确引入代码,代码前后加反引号,让大家都能认出代码..!
        猜你喜欢
        • 1970-01-01
        • 2016-06-29
        • 2023-04-08
        • 2016-09-25
        • 1970-01-01
        • 1970-01-01
        • 2014-08-08
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多