【问题标题】:How to execute operations every batch?如何每批次执行操作?
【发布时间】:2021-12-17 19:06:11
【问题描述】:

我正在尝试从 kafka 主题中读取,执行一些操作,然后将 df 写入磁盘,例如:

df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load() 

df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")

batch_job=df_alarmsFromKafka.writeStream \
    .format("parquet").outputMode("append") \
    .option("path",path) \
    .option("checkpointLocation",cp) \
    .start()

batch_job.awaitTermination()

我遇到的问题是每批只运行 df_alarmsFromKafka 上的操作。
例如,如果我希望每批都评估一个简单的打印,那么它似乎是不可能的,因为它显然只是第一次打印和评估。

是否有不同的方式让我能够在批次之间进行其他操作,而不仅仅是那些与评估的Dataframe.writeStream 严格相关的操作?

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-structured-streaming


    【解决方案1】:

    啊,我想我明白你所说的 “如果我想要每批都评估一个简单的印刷品”的意思。您似乎在要求foreach 运营商:

    设置要使用提供的 writer f 处理的流式查询的输出。这通常用于将流式查询的输出写入任意存储系统。

    伪代码如下所示:

    df_alarmsFromKafka = df_alarmsFromKafka.drop("test")
    df_alarmsFromKafka.foreach(print("runs every batch, in the stream"))
    
    batch_job=df_alarmsFromKafka.writeStream \
        .format("parquet").outputMode("append") \
        .option("path",path) \
        .option("checkpointLocation",cp) \
        .start()
    

    以上内容实际上会启动 2 个流式查询(针对一个输入)。

    【讨论】:

      猜你喜欢
      • 2010-12-02
      • 2013-01-28
      • 1970-01-01
      • 2017-02-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-08-10
      • 2012-03-03
      相关资源
      最近更新 更多