【问题标题】:pyspark streaming commit offset to kafkapyspark 流式提交到 kafka 的偏移量
【发布时间】:2018-11-28 13:35:15
【问题描述】:

根据文档,可以从(scala)火花流应用程序中commit offset into kafka。 我想从 pyspark 实现相同的功能。
或者至少将 kafka 分区、偏移量存储到外部数据存储(RDBMS 等)中。

然而,用于 kafka 集成的 pyspark api 仅提供 RDD(offset, value)] 而不是 RDD[ConsumerRecord](如在 scala 中)。 有没有办法从 python RDD 中获取(topic, partition, offset)?并坚持到其他地方?

【问题讨论】:

    标签: python apache-spark apache-kafka spark-streaming


    【解决方案1】:

    我们可以通过多种方式处理偏移量。我们可以将 Offset 值存储在每个成功处理数据的 Zookeeper 路径中的一种方法,并在我们再次创建流时读取该值。代码 sn-p 如下。

    from kazoo.client import KazooClient
    zk = KazooClient(hosts='127.0.0.1:2181')
    zk.start()
    ZOOKEEPER_SERVERS = "127.0.0.1:2181"
    
    def get_zookeeper_instance():
        from kazoo.client import KazooClient
        if 'KazooSingletonInstance' not in globals():
            globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
            globals()['KazooSingletonInstance'].start()
        return globals()['KazooSingletonInstance']
    
    def save_offsets(rdd):
        zk = get_zookeeper_instance()
        for offset in rdd.offsetRanges():
            path = f"/consumers/{var_topic_src_name}"
            print(path)
            zk.ensure_path(path)
            zk.set(path, str(offset.untilOffset).encode())
    
        var_offset_path = f'/consumers/{var_topic_src_name}'
    
        try:
            var_offset = int(zk.get(var_offset_path)[0])
        except:
            print("The spark streaming started First Time and Offset value should be Zero")
            var_offset  = 0
        var_partition = 0
        enter code here
        topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
        fromoffset = {topicpartion: var_offset}
        print(fromoffset)
        kvs = KafkaUtils.createDirectStream(ssc,\
                                            [var_topic_src_name],\
                                            var_kafka_parms_src,\
                                            valueDecoder=serializer.decode_message,\
                                            fromOffsets = fromoffset)
        kvs.foreachRDD(handler)
        kvs.foreachRDD(save_offsets)
    

    【讨论】:

    • 自 kafka 0.10 版以来,不建议将偏移量提交给 Zookeeper。我的问题涉及更新版本的 kafka,它在 kafka 主题中存储偏移量(使用紧凑日志)。感谢您的建议。
    猜你喜欢
    • 2019-03-11
    • 2020-10-10
    • 1970-01-01
    • 2021-11-14
    • 2018-03-27
    • 1970-01-01
    • 1970-01-01
    • 2020-02-15
    • 2017-06-24
    相关资源
    最近更新 更多