【问题标题】:Spark streaming applications subscribing to same kafka topic订阅相同 kafka 主题的 Spark 流应用程序
【发布时间】:2018-02-06 18:27:55
【问题描述】:

我是 spark 和 kafka 的新手,我对使用 kafka 的 spark 流的使用模式略有不同。 我正在使用

spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1

连续事件数据正在流式传输到我需要从多个 Spark 流应用程序处理的 kafka 主题。但是当我运行 spark 流应用程序时,只有其中一个接收到数据。

     Map<String, Object> kafkaParams = new HashMap<String, Object>();

     kafkaParams.put("bootstrap.servers", "localhost:9092");
     kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     kafkaParams.put("auto.offset.reset", "latest");
     kafkaParams.put("group.id", "test-consumer-group");
     kafkaParams.put("enable.auto.commit", "true");
     kafkaParams.put("auto.commit.interval.ms", "1000");
     kafkaParams.put("session.timeout.ms", "30000");

     Collection<String> topics =  Arrays.asList("4908100105999_000005");;
     JavaInputDStream<ConsumerRecord<String, String>> stream =  org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams) );

      ... //spark processing

我有两个 spark 流应用程序,通常我提交的第一个使用 kafka 消息。第二个应用程序只是等待消息并且永远不会继续。 正如我所读到的,可以从多个消费者订阅 kafka 主题,火花流不是这样吗?或者我在 kafka 主题及其配置方面缺少什么?

提前致谢。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming spark-streaming-kafka


    【解决方案1】:

    您可以使用相同的 groupid 创建不同的流。以下是 0.8 集成在线文档中的更多详细信息,有两种方法:

    方法 1:基于接收者的方法

    可以使用不同的组创建多个 Kafka 输入 DStream,并且 使用多个接收器并行接收数据的主题。

    方法 2:直接方法(无接收者)

    无需创建多个输入 Kafka 流并将它们合并。和 directStream,Spark Streaming 将创建尽可能多的 RDD 分区 有 Kafka 分区可供消费,它们都将从中读取数据 卡夫卡并行。所以 Kafka 和 RDD 分区,更容易理解和调优。

    您可以在Spark Streaming + Kafka Integration Guide 0.8阅读更多内容

    从您的代码看来,您使用的是 0.10,请参考 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0

    即使它使用的是 spark 流 api,一切都由 kafka 属性控制,因此取决于您在属性文件中指定的组 id,您可以使用不同的组 id 启动多个流。

    干杯!

    【讨论】:

    • 我在两个消费者中使用了相同的组 ID,所以只有一个消费者在接收消息。具有不同 group.id 订阅同一主题的消费者,分别/并行接收消息。
    • 是的,如果您使用相同的组 id,那么只有一个人会收到消息。
    【解决方案2】:

    消费者数量[在一个消费者组下],不能超过主题中的分区数量。如果要并行消费消息,则需要引入合适数量的分区并创建接收器来处理每个分区。

    【讨论】:

    • 拥有两个消费者组与在同一个消费者组下拥有两个分区有什么区别?
    • 我的意思是 Kafka 分区。如果你的Kafka topic有两个partition,并且想并行处理消息,那么可以引入一组consumer【这个consumer group的consumer个数不能超过topic中正在消费的partition个数】 Consumer groups由消费者组 ID 标识。如果两个消费者组具有相同的组 id,则 Kafka 将假定这两个消费者组为一个。如果您对两个应用程序使用相同的代码,请尝试为第二个应用程序更改 kafkaParams.put("group.id", "test-consumer-group1")。
    • 单分区和从两个消费者组读取是否会影响 Kafka 的性能或吞吐量?目前我有 4 个主题都具有单个分区,从两个不同的消费者组中使用它们。当传入数据速率增加时,我不确定这是否会扩大而不影响性能。
    • 不增加额外的消费者应该不会影响Kafka的性能[网络带宽可能是瓶颈,请确保其足以支持数据传输的增加。 Kafka 本身不会有任何性能下降]。
    猜你喜欢
    • 2018-01-19
    • 2017-06-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-19
    • 2016-10-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多