【发布时间】:2021-11-27 06:01:30
【问题描述】:
我已将我的 Postgres DB 连接到 Kafka,以读取表中的新记录并将它们推送到 elasticsearch。
数据库已经有一些记录,当我将 kafka 连接到数据库时,这些记录在 kafka 主题中可见(使用./kafka-console-consumer.sh --topic postgres.public.table --bootstrap-server kafka:9092 --from-beginning)。
但是使用下面的代码 sn-p,我只能读取表中的新记录。
if __name__ == "__main__":
if es.indices.exists('test-index'):
es.indices.delete('test-index')
es.indices.create('test-index')
ssc = StreamingContext(sc, 30)
brokers, topic = sys.argv[1:]
print(brokers)
print(topic)
kStream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers,
'group.id':'ozy-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'})
lines = kStream.map(lambda x: x[1])
lines.count().map(lambda x:'profiles in this batch: %d' % x).pprint()
lines.foreachRDD(RDDfromKafkaStream)
ssc.start()
ssc.awaitTermination()
如何同时读取表中的现有记录?
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-streaming