【发布时间】:2021-12-17 19:06:11
【问题描述】:
我正在尝试从 kafka 主题中读取,执行一些操作,然后将 df 写入磁盘,例如:
df_alarmsFromKafka=spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", ip) \
.option("subscribe", topic) \
.option("request.timeout.ms",80000) \
.option("includeHeaders", "true") \
.load()
df_alarmsFromKafka=df_alarmsFromKafka.drop("test")
print("ran only once, not in the stream")
batch_job=df_alarmsFromKafka.writeStream \
.format("parquet").outputMode("append") \
.option("path",path) \
.option("checkpointLocation",cp) \
.start()
batch_job.awaitTermination()
我遇到的问题是每批只运行 df_alarmsFromKafka 上的操作。
例如,如果我希望每批都评估一个简单的打印,那么它似乎是不可能的,因为它显然只是第一次打印和评估。
是否有不同的方式让我能够在批次之间进行其他操作,而不仅仅是那些与评估的Dataframe.writeStream 严格相关的操作?
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-structured-streaming