【发布时间】:2021-05-05 10:16:52
【问题描述】:
我试图在使用下面的 sn-p 处理后将火花流插入 kafka
query = ds1 \
.selectExpr("CAST(value AS STRING)")\
.writeStream\
.foreachBatch(do_something) \
.format("kafka") \
.option("topic","topic-name") \
.option("kafka.bootstrap.servers", "borkers-IPs") \
.option("checkpointLocation", "/home/location") \
.start()
但它似乎插入的是原始流而不是处理过的流。
【问题讨论】:
-
经纪人...经纪人?
-
不知道 foreachbatch 以这种方式工作 - 即用于任意非标准接收器
标签: pyspark apache-kafka spark-structured-streaming