【问题标题】:Kafka consumer in Spark StreamingSpark Streaming 中的 Kafka 消费者
【发布时间】:2014-12-30 18:31:09
【问题描述】:

尝试编写使用来自 Kafka 的消息的 Spark Streaming 作业。以下是我目前所做的:

  1. 启动 Zookeeper
  2. 已启动 Kafka 服务器
  3. 向服务器发送了一些消息。当我运行以下命令时,我可以看到它们:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
    
  4. 现在尝试编写一个程序来计算 5 分钟内收到的消息数。

代码如下所示:

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
        sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

不确定要为第三个参数(消费者组)使用什么值。当我运行它时,我得到Unable to connect to zookeeper server。但是 Zookeeper 在端口2181 上运行;否则第 3 步将无法正常工作。

似乎我没有正确使用KafkaUtils.createStream。有什么想法吗?

【问题讨论】:

  • zookeeper 是否与 Spark 在同一个机器上运行?您是否尝试通过使用 /current/bin/zkCli.sh 连接到 Zookeeper 来验证 Zookeeper 是否已启动并运行?
  • 我太笨了!我将“localhost”更改为实际的机器名称并克服了这个错误。但是 - 它还没有完全起作用。有人知道 Kafka 下“消费者组”的“默认”值是多少吗?它似乎没有消耗任何消息。
  • 我面临着同样的问题,我没有收到来自生产者的任何消息。我正在使用 python 生产者。而且我还能够从控制台消费者那里获得消息。我的配置中的 numofparitions 也是 1。 @DilTeam 你是怎么解决这个问题的?

标签: java apache-spark apache-zookeeper apache-kafka spark-streaming


【解决方案1】:

没有默认消费者组这样的东西。您可以在那里使用任意非空字符串。如果你只有一个消费者,那么它的消费者群体并不重要。如果有两个或多个消费者,他们可以是同一个消费者组的一部分,也可以属于不同的消费者组。

来自http://kafka.apache.org/documentation.html

消费者

...

如果所有消费者实例具有相同的消费者组,那么这个 就像传统的队列负载平衡消费者一样工作。

如果所有的消费者实例都有不同的消费者组,那么 这就像发布订阅一样,所有消息都广播到 所有消费者。

我认为问题可能出在“主题”参数中。 来自Spark docs

要使用的 (topic_name -> numPartitions) 映射。每个分区都在自己的线程中消耗

您只为主题指定了一个分区,即“1”。根据代理的设置(num.partitions),可能有多个分区,并且您的消息可能会发送到您的程序未读取的其他分区。

此外,我相信 partitionIds 是基于 0 的。所以如果你只有一个分区,它的 id 会等于 0。

【讨论】:

  • 不确定分区 ID 是否如您所建议的那样基于 0。当我使用: map.put("mytopic", new Integer(0));我收到此错误: ERROR ReceiverTracker: Deregistered receiver for stream 0: Error started receiver 0 - java.lang.AssertionError: assertion failed
  • 不应该按照代码打印一些东西吗? JavaDStream statuses = tweets.map(new Function() { public String call(String status) { System.out.println(status); return status; } } );
【解决方案2】:

我认为你应该为 zookeeper 指定 ip 而不是 localhost。此外,第三个参数是消费者组名称。它可以是任何你喜欢的名字。当您有多个消费者绑定到同一组时,主题分区会相应分配。您的推文应该是:

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map);

【讨论】:

    【解决方案3】:

    我遇到了同样的问题。这是对我有用的解决方案。

    • 分配给 Spark Streaming 应用程序的核心数量必须大于接收器的数量。否则系统将接收数据,但无法处理它。因此 Spark Streaming 至少需要两个核心。所以在我的 spark-submit 中,我应该提到至少两个核心。
    • kafka-clients-version.jar 应包含在 spark-submit 的依赖 jar 列表中。

    【讨论】:

      【解决方案4】:

      如果 zookeeper 与您的流应用程序在同一台机器上运行,则“localhost:2181”将起作用。否则,您必须提及运行 zookeeper 的主机地址,并确保运行流式应用程序的机器能够与端口 2181 上的 zookeeper 主机通信。

      【讨论】:

      • 不要发表评论作为答案。这应该是评论
      【解决方案5】:

      我认为,在您的代码中,调用的第二个参数 KafkaUtils.createStream,应该是kafka服务器的host:port,而不是zookeeper主机和端口。检查一次。

      编辑: Kafka Utils API Documentation

      根据上面的文档,它应该是 zookeeper quorum 。所以应该使用 Zookeeper 主机名和端口。

      zkQuorum Zookeeper 仲裁(主机名:端口,主机名:端口,..)。

      【讨论】:

      • 如果我给 host:port 那么连接失败。它只是 zookeeper 主机和端口。
      • 只是zookeeper的列表,即zookeeperQorum
      猜你喜欢
      • 2017-02-23
      • 2018-12-18
      • 1970-01-01
      • 2017-05-04
      • 1970-01-01
      • 2020-06-13
      • 1970-01-01
      • 2017-10-30
      • 2016-07-30
      相关资源
      最近更新 更多