【发布时间】:2019-07-17 14:35:29
【问题描述】:
我正在尝试使用 spark 制作结构化流应用程序,主要思想是从 kafka 源读取,处理输入,写回另一个主题。我已经成功地使火花读写卡夫卡,但我的问题是处理部分。我已经尝试使用 foreach 函数来捕获每一行并在写回 kafka 之前对其进行处理,但是它总是只执行 foreach 部分并且从不写回 kafka。但是,如果我从写入流中删除 foreach 部分,它将继续写入,但现在我失去了处理。
如果有人能给我一个例子来说明如何做到这一点,我将非常感激。
这是我的代码
spark = SparkSession \
.builder \
.appName("StructuredStreamingTrial") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "KafkaStreamingSource") \
.load()
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "StreamSink") \
.option("checkpointLocation", "./testdir")\
.foreach(foreach_function)
.start().awaitTermination()
而foreach_function 就是
def foreach_function(df):
try:
print(df)
except:
print('fail')
pass
【问题讨论】:
标签: apache-spark pyspark spark-structured-streaming