【问题标题】:How to create InputDStream with offsets in PySpark (using KafkaUtils.createDirectStream)?如何在 PySpark 中创建带偏移量的 InputDStream(使用 KafkaUtils.createDirectStream)?
【发布时间】:2025-12-14 15:45:01
【问题描述】:

如何在 Pyspark 中使用 KafkaUtils.createDirectStream 和特定 Topic 的偏移量?

【问题讨论】:

    标签: apache-spark apache-kafka pyspark


    【解决方案1】:

    如果您想从 Kafka 主题中的记录创建 RDD,请使用一组静态元组。

    使所有导入可用

    from pyspark.streaming.kafka import KafkaUtils, OffsetRange
    

    然后你创建一个 Kafka Brokers 的字典

    kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
    

    然后你创建你的偏移对象

    start = 0
    until = 10
    partition = 0
    topic = 'topic'    
    offset = OffsetRange(topic,partition,start,until)
    offsets = [offset]
    

    最后你创建了 RDD:

    kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
    

    要创建带有偏移量的 Stream,您需要执行以下操作:

    from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
    from pyspark.streaming import StreamingContext
    

    然后你使用你的 sparkcontext 创建你的 sparkstreaming 上下文

    ssc = StreamingContext(sc, 1)
    

    接下来我们设置所有参数

     kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
     start = 0
     partition = 0
     topic = 'topic'    
    

    然后我们创建我们的 fromOffset 字典

    topicPartion = TopicAndPartition(topic,partition)
    fromOffset = {topicPartion: long(start)}
    //notice that we must cast the int to long 
    

    最后我们创建了 Stream

    directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
    fromOffsets=fromOffset)
    

    【讨论】:

    • 但我收到错误“TypeError: unhashable type: 'TopicAndPartition'”
    • 这对于 Kafka 0.8 和 Spark 2.0+ 已经过时了 :(
    【解决方案2】:

    你可以这样做:

    from pyspark.streaming.kafka import TopicAndPartition
    topic = "test"
    brokers = "localhost:9092"
    partition = 0
    start = 0
    topicpartion = TopicAndPartition(topic, partition)
    fromoffset = {topicpartion: int(start)}
    kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
            {"metadata.broker.list": brokers}, fromOffsets = fromoffset)
    

    注意:Spark 2.2.0、python 3.6

    【讨论】: