【发布时间】:2025-12-14 15:45:01
【问题描述】:
如何在 Pyspark 中使用 KafkaUtils.createDirectStream 和特定 Topic 的偏移量?
【问题讨论】:
标签: apache-spark apache-kafka pyspark
如何在 Pyspark 中使用 KafkaUtils.createDirectStream 和特定 Topic 的偏移量?
【问题讨论】:
标签: apache-spark apache-kafka pyspark
如果您想从 Kafka 主题中的记录创建 RDD,请使用一组静态元组。
使所有导入可用
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
然后你创建一个 Kafka Brokers 的字典
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
然后你创建你的偏移对象
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
最后你创建了 RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
要创建带有偏移量的 Stream,您需要执行以下操作:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
然后你使用你的 sparkcontext 创建你的 sparkstreaming 上下文
ssc = StreamingContext(sc, 1)
接下来我们设置所有参数
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
然后我们创建我们的 fromOffset 字典
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
最后我们创建了 Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
【讨论】:
你可以这样做:
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
{"metadata.broker.list": brokers}, fromOffsets = fromoffset)
注意:Spark 2.2.0、python 3.6
【讨论】: