【问题标题】:Pyspark read existing records using kafkaPyspark 使用 kafka 读取现有记录
【发布时间】:2021-11-27 06:01:30
【问题描述】:

我已将我的 Postgres DB 连接到 Kafka,以读取表中的新记录并将它们推送到 elasticsearch。 数据库已经有一些记录,当我将 kafka 连接到数据库时,这些记录在 kafka 主题中可见(使用./kafka-console-consumer.sh --topic postgres.public.table --bootstrap-server kafka:9092 --from-beginning)。

但是使用下面的代码 sn-p,我只能读取表中的新记录。

if __name__ == "__main__":
    
    if es.indices.exists('test-index'):
           es.indices.delete('test-index')
    
    es.indices.create('test-index')

    ssc = StreamingContext(sc, 30)
    brokers, topic = sys.argv[1:]
    print(brokers)
    print(topic)
    kStream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers,
                       'group.id':'ozy-group', 
                       'fetch.message.max.bytes':'15728640',
                       'auto.offset.reset':'largest'})
    lines = kStream.map(lambda x: x[1])
    lines.count().map(lambda x:'profiles in this batch: %d' % x).pprint()

    lines.foreachRDD(RDDfromKafkaStream)
    ssc.start()
    ssc.awaitTermination() 

如何同时读取表中的现有记录?

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-streaming


    【解决方案1】:

    'auto.offset.reset':'smallest'会让consumer读取已经存在的数据,而不是你启动consumer后进来的数据

    旁注:您应该改用结构化流媒体

    【讨论】:

    • 你的意思是它只会读取已经存在的数据,而忽略新的传入记录吗?对于我的用例,我希望包含两种类型的记录。我也会检查结构化流。
    • 不,不。偏移量只会从主题的最早位置开始并继续向前,然后等待新的数据。卡夫卡主题没有“结束”会阻止消费者。您在问题中发布的代码仅从有关该主题的最新消息“之后”开始阅读。
    猜你喜欢
    • 1970-01-01
    • 2022-11-24
    • 2018-01-17
    • 2019-07-08
    • 2023-03-12
    • 2023-03-23
    • 2018-06-21
    • 1970-01-01
    • 2017-01-11
    相关资源
    最近更新 更多