【发布时间】:2017-02-10 19:41:59
【问题描述】:
如何使用Direct Stream approach?在多个Consumer中消费Kakfa topic messages
有可能吗?由于 Direct Stream 方法没有Consumer Group 概念。
如果我将 group.id 作为 DirectStream 方法的 kafkaparams 传递,会发生什么?下面的代码在with group.id 上工作,作为Kafka 参数也可以在without group.id 上工作。
示例代码:
val kafkaParams = Map(
"group.id" -> "group1",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
"metadata.broker.list" -> brokerList,
"zookeeper.connect" -> zookeeperURL
)
val dStream =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
).map(_._2)
【问题讨论】:
标签: scala spark-streaming kafka-consumer-api