【问题标题】:How can I read every 5 seconds in pyspark with kafka readStream?如何使用 kafka readStream 在 pyspark 中每 5 秒读取一次?
【发布时间】:2022-01-16 21:22:37
【问题描述】:

我想每 5 秒阅读一个主题;对于旧版本的pyspark,我可以使用 kafka-utils 和 window 方法,但目前我无法使用。

现在我正在使用以下代码从kafkaspark 加载数据

spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", 'data') \
    .load()

但是,我正在读取所有数据。

所以我想知道如何在可能的情况下每 5 秒读取一次批量大小为 1 秒的数据。

谢谢

【问题讨论】:

  • 结构化流以微批量读取数据。如果您需要将数据分组到更大的批次中,您仍然可以添加窗口

标签: apache-spark pyspark apache-kafka spark-structured-streaming


【解决方案1】:

假设您希望每 5 秒按一次聚合和分组,请参阅 documentation on windowing

这应该定义一个翻转窗口

kafka_df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
        window(kafka_df.timestamp, "5 seconds", "1 second"),
        kafka_df.value) 

【讨论】:

  • Finnaly 有效,但是当我使用附加模式时出现错误,如何每 10 秒显示一次结果?
  • 错误是什么?
  • 是一个“错字”,我解决了我的问题,感谢您的帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-12-03
  • 2019-04-21
  • 2021-11-25
  • 1970-01-01
相关资源
最近更新 更多