【问题标题】:Kafka Spark Streaming Consumer will not receive any messages from Kafka Console Producer?Kafka Spark Streaming Consumer 不会收到来自 Kafka Console Producer 的任何消息?
【发布时间】:2018-02-09 11:32:08
【问题描述】:

我正在尝试集成 spark 和 Kafka 以使用来自 Kafka 的消息。我也有生产者代码来发送关于“临时”主题的消息。另外,我正在使用 Kafka 的控制台生产者来生产关于“临时”主题的消息。

我创建了下面的代码来使用来自同一“临时”主题的消息,但它也不会收到一条消息。

计划:

import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import static org.apache.commons.lang3.StringUtils.SPACE;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.immutable.ListSet;
import scala.collection.immutable.Set;

public class ConsumerDemo {

    public void main() {
        String zkGroup = "localhost:2181";
        String group = "test";
        String[] topics = {"temp"};
        int numThreads = 1;

        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<>();
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }
        System.out.println("topics : " + Arrays.toString(topics));
        JavaPairReceiverInputDStream<String, String> messages
                = KafkaUtils.createStream(jssc, zkGroup, group, topicMap);

        messages.print();

        JavaDStream<String> lines = messages.map(Tuple2::_2);

        //lines.print();
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((i1, i2) -> i1 + i2);

        //wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }

    public static void main(String[] args) {
        System.out.println("Started...");
        new ConsumerDemo().main();
        System.out.println("Ended...");
    }
}

我在 pom.xml 文件中添加了以下依赖项:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>0.9.0-incubating</version>
        <type>jar</type>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
        <type>jar</type>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

    <dependency>
        <groupId>org.anarres.lzo</groupId>
        <artifactId>lzo-core</artifactId>
        <version>1.0.5</version>
        <type>jar</type>
    </dependency>

    <dependency> 
        <groupId>com.fasterxml.jackson.core</groupId> 
        <artifactId>jackson-databind</artifactId> 
        <version>2.8.2</version> 
    </dependency> 
    <dependency> 
        <groupId>com.fasterxml.jackson.module</groupId> 
        <artifactId>jackson-module-scala_2.10</artifactId> 
        <version>2.8.2</version> 
    </dependency>
    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>

我是否缺少某些依赖项或代码中的问题?为什么这段代码不会收到任何消息?

【问题讨论】:

  • 您是否能够使用基于控制台的消费者来消费消息?如果不是,那么生产者可能有问题。另外,请检查您的端口号是否正确。我认为 POM 中不应该有任何问题,如果有问题,它应该不允许您构建/编译项目。
  • @NileshPhrate- 是的,我可以使用 Kafka 的控制台消费者来消费消息,所以我们可以说这个问题与 kafka 或 zookeeper 无关,即我用于控制台方法的相同 ip 和端口。

标签: java apache-spark apache-kafka spark-streaming


【解决方案1】:

当使用 Kafka 消费者时,特别是当我们在开发环境中进行测试和调试时,生产者可能不会持续向 Kafka 推送消息。 在这种情况下,我们需要处理这个 Kafka 消费者参数auto.offset.reset,它决定了消费者开始运行后是否只读取写入到主题的新消息?或从题目开始阅读

这里是Kafka documentation官方给出的解释:

auto.offset.reset
当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办 (例如,因为该数据已被删除):

  1. earliest:自动将偏移量重置为最早的偏移量
  2. latest:自动将偏移量重置为最新的偏移量
  3. none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常
  4. 其他:向消费者抛出异常。

关于如何使用 kafkaParams 创建 KafkaDStream 的示例代码如下:

    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "localhost:2181");
    kafkaParams.put("group.id", "test02");  //While you are testing the codein develeopment system, change this groupid each time you run the consumer
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    Map<String, Integer> topics = new HashMap<String, Integer>();
    topics.put("temp", 1);
    StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();

    JavaPairDStream<String, String> messages =
        KafkaUtils.createStream(jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topics,
                storageLevel)    
        ;
    messages.print();

【讨论】:

  • @remisharoon- 我只收到带有时间戳的消息(以毫秒为单位)。那是什么意思?下面是示例输出--------------------------------------------------------- 时间: 1504785338000 毫秒 -------------------------------------------- ---- --------------------------------------- 时间:1504785340000 毫秒 -------- ------------------------------------
  • @kit ,意思是“空的 DStream”。 IE。它没有从 Kafka 读取任何记录。请在启动 SparkStreming 作业后尝试写入 Kafka 主题
  • @remisharoon- 我正在从 kafka 的控制台生产者向同一个 kafka 主题发送消息......它仍然在打印空 DStream......这是什么原因?
  • @kit 我相信您正在尝试在没有任何地图的情况下按原样打印 DStream 或减少到它,在这种情况下,它应该打印这些消息(假设您正在向触发 Spark Streaming 后的 kafka)。我没有看到这个问题
【解决方案2】:

您没有调用您有代码来连接和使用来自 Kafka 的消息的方法。在 public static void main() 中编写该逻辑或调用您编写此逻辑的方法。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-11-03
    • 1970-01-01
    • 1970-01-01
    • 2022-12-15
    • 2018-01-09
    • 2019-11-20
    • 2017-02-15
    • 2017-03-27
    相关资源
    最近更新 更多