【发布时间】:2022-11-14 02:58:31
【问题描述】:
我在 Databricks 集群上使用 Spark 结构化流从 Azure 事件中心提取数据,对其进行处理,然后使用 ForEachBatch 将其写入雪花,并将 Epoch_Id/Batch_Id 传递给 foreach 批处理函数。
我的代码如下所示:
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = consumergroup
# Read stream data from event hub
spark_df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
一些转变...
写给雪花
def foreach_batch_function(df, epoch_id):
df.write\
.format(SNOWFLAKE_SOURCE_NAME)\
.options(**sfOptions)\
.option("dbtable", snowflake_table)\
.mode('append')\
.save()
processed_df.writeStream.outputMode('append').\
trigger(processingTime='10 seconds').\
option("checkpointLocation",f"checkpoint/P1").\
foreachBatch(foreach_batch_function).start()
目前我面临两个问题:
-
发生节点故障时。虽然在 spark 官方网站上,有人提到当在恢复表单节点故障期间使用 ForeachBatch 和 epoch_id/batch_id 时,不应该有任何重复,但我确实发现在我的雪花表中填充了重复项。参考链接:[Spark Structured Streaming ForEachBatch With Epoch Id][1]。
-
我遇到错误a。)TransportClient:无法将 RPC RPC 5782383376229127321 发送到 /30.62.166.7:31116:java.nio.channels.ClosedChannelException和b。)TaskSchedulerImpl:30.62.166.7 上丢失的执行程序 1560:工人退役:工人退役在我的数据块集群上非常频繁。无论我分配了多少执行程序或增加了多少执行程序内存,集群都会达到最大工作人员限制,并且我会收到两个错误之一,在恢复后我的雪花表中会填充重复项。
对上述任何一点的任何解决方案/建议都会有所帮助。
提前致谢。
【问题讨论】:
标签: apache-spark snowflake-cloud-data-platform databricks spark-structured-streaming azure-eventhub