【问题标题】:foreach() method with Spark Streaming errors带有 Spark Streaming 错误的 foreach() 方法
【发布时间】:2021-12-29 05:52:42
【问题描述】:

我正在尝试每隔 120 秒将从 Kafka 提取的数据写入 Bigquery 表。 我想做一些额外的操作,通过文档应该可以在 .foreach()foreachBatch() 方法中进行。

作为测试,我想在每次从 kafka 提取数据并写入 BigQuery 时打印一条简单的消息。

batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreachBatch(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()

我希望在 jupyter Lab 输出单元格上每 120 秒打印一次此消息,而不是仅打印一次并继续写入 BigQuery。

如果我尝试使用.foreach() 而不是foreachBatch()

batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime='120 seconds') \
.foreach(print("do i get printed every batch?"))
.format("bigquery").outputMode("append") \
.option("temporaryGcsBucket",path1) \
.option("checkpointLocation",path2) \
.option("table", table_kafka) \
.start()
batch_job.awaitTermination()

它在给出以下错误后立即打印消息,我无法调试/理解:

/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
   1335 
   1336             if not hasattr(f, 'process'):
-> 1337                 raise Exception("Provided object does not have a 'process' method")
   1338 
   1339             if not callable(getattr(f, 'process')):

Exception: Provided object does not have a 'process' method

我做错了吗?除了直接在评估 df_alarmsFromKafka 的数据帧上执行的操作之外,我怎么能简单地每 120 秒执行一些操作?

【问题讨论】:

    标签: pyspark google-bigquery spark-streaming google-cloud-dataproc spark-streaming-kafka


    【解决方案1】:

    允许其他操作,但仅限于流式查询的输出数据。但是在这里您尝试打印一些与输出数据本身无关的字符串。只能打印一次。

    例如,如果您编写如下 foreachbatch 函数:

    def write_to_cassandra(target_df, batch_id):
    target_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .option("keyspace", "tweet_db") \
        .option("table", "tweet2") \
        .mode("append") \
        .save()
    target_df.show()
    

    由于 .show() 函数与输出数据本身相关,因此它将在每个批次上触发 target_df。

    第二个问题:

    Foreach 函数期望您通过实现您没有的打开、处理和关闭方法来扩展 ForeachWriter 类。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-03-28
      • 2016-09-10
      • 2016-01-20
      • 1970-01-01
      • 2015-01-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多