【发布时间】:2021-12-29 05:52:42
【问题描述】:
我正在尝试每隔 120 秒将从 Kafka 提取的数据写入 Bigquery 表。
我想做一些额外的操作,通过文档应该可以在 .foreach() 或 foreachBatch() 方法中进行。
作为测试,我想在每次从 kafka 提取数据并写入 BigQuery 时打印一条简单的消息。
batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreachBatch(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()
我希望在 jupyter Lab 输出单元格上每 120 秒打印一次此消息,而不是仅打印一次并继续写入 BigQuery。
如果我尝试使用.foreach() 而不是foreachBatch()
batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreach(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()
它在给出以下错误后立即打印消息,我无法调试/理解:
/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
1335
1336 if not hasattr(f, 'process'):
-> 1337 raise Exception("Provided object does not have a 'process' method")
1338
1339 if not callable(getattr(f, 'process')):
Exception: Provided object does not have a 'process' method
我做错了吗?除了直接在评估 df_alarmsFromKafka 的数据帧上执行的操作之外,我怎么能简单地每 120 秒执行一些操作?
【问题讨论】:
标签: pyspark google-bigquery spark-streaming google-cloud-dataproc spark-streaming-kafka