【问题标题】:How to get consumers to work in Kafka 0.8 API如何让消费者在 Kafka 0.8 API 中工作
【发布时间】:2016-05-19 11:17:18
【问题描述】:

我即将编写一个用于发布和使用 kafka 消息的原型。 我们确实已经设置了 Cloudera 基础架构(动物园管理员、代理等),并且我已经成功地使用了 Kafka 命令行工具来生成和使用消息。

我使用[org.apache.kafka/kafka_2.10 "0.8.2.1"]作为依赖,并且已经能够使用客户端API设置KafkaProducer,它发布带有纯字符串内容的消息,并且可以被命令行消费者成功读取另一边。

我的问题是:internets 上是否有一个代码示例来显示如何初始化KafkaConsumer,并在另一侧阅读该消息,因为我已经搜索了几天,但没有一个code examples 似乎在工作:

  • 它们使用的类或方法甚至在 API 本身中都不存在(例如,它们看似将属性映射传递给 org.apache.kafka.clients.consumer.ConsumerConfig 的构造函数,但不存在这样的构造函数;
  • kafka.consumer.Consumer 类上调用 createJavaConsumerConnector 静态方法...这些东西存在于哪个宇宙中?)。

通常每个示例看起来都非常复杂。我希望消息传递框架需要几行配置来连接到代理,以及一些功能来放入/从队列或主题中取出。为 Kafka 设置 Producer 并不是一件非常复杂的事情,我希望 Consumer 也类似。

看来我也是not alone

【问题讨论】:

  • 不同的Kafka版本有API变化。跟踪您查看的示例的版本。在这里查看 0.8.2 API webhostingreviewjam.com/mirror/apache/kafka/0.8.2-beta/java-doc/… 如果这没有帮助,请提出更准确的问题...
  • @MatthiasJ.Sax 所以如果我理解正确的话,结论是 Kafka 在这个阶段绝对不是你推荐投入生产的东西。我之前用过很多消息系统,都不是太复杂,你可以从一个队列/主题中puttake,经过几行配置/认证(1或2,不是100s),仅此而已. Kafka 正在实现该功能集,但我会尽量避免将其称为没有这些非常基本功能的消息传递平台。
  • @MatthiasJ.Sax 至于具体的例子,我写了我正在使用的maven依赖,以及我链接到的代码示例(对于版本0.8.0,并基于semantic versioning的概念,我希望它可以与我的 Maven 依赖项的版本一起使用),但它甚至无法编译。您可以向我指出一个与 0.8.2.1. 一起使用的代码示例,我将不胜感激。
  • Kafka 已经完全可以生产了——已经有一段时间了。请参阅此处以获取使用它的公司列表:cwiki.apache.org/confluence/display/KAFKA/Powered+By 当然,您需要牢记您的用例,并且可能会出现其他一些系统最符合您的要求。对于 0.8.2 示例,请看这里:github.com/apache/kafka/tree/0.8.2/examples/src/main/java/kafka/…
  • 这是 Kafka 与 AMQP 的一个很好的比较(从 28:45 开始观看)infoq.com/presentations/event-streams-kafka

标签: apache-kafka cloudera


【解决方案1】:

首先我想提一下,Kafka 0.8.00.8.10.8.2 之间有一些 API 更改(0.9.00.10.0 发生了市长重写和简化)——因此,您的问题有点开放,只是要求0.8

要为 0.8.2.2 编写 Java 使用者,您需要包含依赖项:

这是针对 Scala 2.11 的——还有其他可用的 Scala 版本。

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
</dependency>

不要使用 kafka-clients 作为 0.8.x 的 artifactId。

消费者接收&lt;String,String&gt;键值对消息并将它们打印到stdout的最小示例如下所示:

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "myGroup");

        final String topic = "test";

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1)); // number of consumer threads

        KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        // infinite loop
        while(it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }

        // non-reachable code...
        consumer.shutdown();
    }
}

一个完整的例子——使用多个消费者线程,包括正确关闭——可以在这里找到:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

要对此进行测试,请遵循 quickstart 指南并通过 Kafka 的 console-producer 发送消息。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2020-03-11
  • 2017-09-19
  • 2015-01-26
  • 2019-08-27
  • 1970-01-01
  • 2015-01-26
  • 1970-01-01
  • 2017-10-16
相关资源
最近更新 更多