【发布时间】:2014-12-30 18:31:09
【问题描述】:
尝试编写使用来自 Kafka 的消息的 Spark Streaming 作业。以下是我目前所做的:
- 启动 Zookeeper
- 已启动 Kafka 服务器
-
向服务器发送了一些消息。当我运行以下命令时,我可以看到它们:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning 现在尝试编写一个程序来计算 5 分钟内收到的消息数。
代码如下所示:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不确定要为第三个参数(消费者组)使用什么值。当我运行它时,我得到Unable to connect to zookeeper server。但是 Zookeeper 在端口2181 上运行;否则第 3 步将无法正常工作。
似乎我没有正确使用KafkaUtils.createStream。有什么想法吗?
【问题讨论】:
-
zookeeper 是否与 Spark 在同一个机器上运行?您是否尝试通过使用
/current/bin/zkCli.sh 连接到 Zookeeper 来验证 Zookeeper 是否已启动并运行? -
我太笨了!我将“localhost”更改为实际的机器名称并克服了这个错误。但是 - 它还没有完全起作用。有人知道 Kafka 下“消费者组”的“默认”值是多少吗?它似乎没有消耗任何消息。
-
我面临着同样的问题,我没有收到来自生产者的任何消息。我正在使用 python 生产者。而且我还能够从控制台消费者那里获得消息。我的配置中的 numofparitions 也是 1。 @DilTeam 你是怎么解决这个问题的?
标签: java apache-spark apache-zookeeper apache-kafka spark-streaming