【问题标题】:Spark Structured Stream Scalability and Duplicates IssueSpark 结构化流的可扩展性和重复问题
【发布时间】:2022-11-14 02:58:31
【问题描述】:

我在 Databricks 集群上使用 Spark 结构化流从 Azure 事件中心提取数据,对其进行处理,然后使用 ForEachBatch 将其写入雪花,并将 Epoch_Id/Batch_Id 传递给 foreach 批处理函数。

我的代码如下所示:

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = consumergroup

# Read stream data from event hub
spark_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

一些转变...

写给雪花

def foreach_batch_function(df, epoch_id):
       df.write\
            .format(SNOWFLAKE_SOURCE_NAME)\
            .options(**sfOptions)\
            .option("dbtable", snowflake_table)\
            .mode('append')\
            .save()

processed_df.writeStream.outputMode('append').\
    trigger(processingTime='10 seconds').\
    option("checkpointLocation",f"checkpoint/P1").\
    foreachBatch(foreach_batch_function).start()

目前我面临两个问题:

  1. 发生节点故障时。虽然在 spark 官方网站上,有人提到当在恢复表单节点故障期间使用 ForeachBatch 和 epoch_id/batch_id 时,不应该有任何重复,但我确实发现在我的雪花表中填充了重复项。参考链接:[Spark Structured Streaming ForEachBatch With Epoch Id][1]。

  2. 我遇到错误a。)TransportClient:无法将 RPC RPC 5782383376229127321 发送到 /30.62.166.7:31116:java.nio.channels.ClosedChannelException和b。)TaskSchedulerImpl:30.62.166.7 上丢失的执行程序 1560:工人退役:工人退役在我的数据块集群上非常频繁。无论我分配了多少执行程序或增加了多少执行程序内存,集群都会达到最大工作人员限制,并且我会收到两个错误之一,在恢复后我的雪花表中会填充重复项。

    对上述任何一点的任何解决方案/建议都会有所帮助。

    提前致谢。

【问题讨论】:

    标签: apache-spark snowflake-cloud-data-platform databricks spark-structured-streaming azure-eventhub


    【解决方案1】:

    foreachBatch 根据定义不是幂等的,因为当当前执行的批处理失败时,它会重试,并且可以观察到部分结果,这与您的观察结果相匹配。 foreachBatch 中的幂等写入是 applicable 仅适用于 Delta Lake 表,不适用于其他接收器类型。我对 Snowflake 不太熟悉,但也许你可以实现类似于其他数据库的东西 - 将数据写入临时表(每个批次都会进行覆盖),然后从该临时表合并到目标表中。

    关于第二个问题 - 看起来您正在使用自动缩放集群 - 在这种情况下,工作人员可能会被退役,因为集群管理器检测到集群未完全加载。为避免这种情况,您可以禁用自动缩放,并使用固定大小的集群。

    【讨论】: