【发布时间】:2019-01-09 21:46:40
【问题描述】:
我正在编写一个与 Spark 的 JDBC 数据源实现有相似之处的数据源,我想问一下 Spark 如何处理某些故障场景。据我了解,如果执行器在运行任务时死亡,Spark 将恢复执行器并尝试重新运行该任务。然而,这在数据完整性和 Spark 的 JDBC 数据源 API(例如df.write.format("jdbc").option(...).save())的上下文中如何发挥作用?
在JdbcUtils.scala的savePartition函数中,我们看到Spark调用了用户提供的数据库url/凭据生成的Java连接对象的提交和回滚函数(见下文)。但是,如果执行器在 commit() 完成后或调用 rollback() 之前立即死亡,Spark 是否会尝试重新运行任务并再次写入相同的数据分区,实质上是在数据库中创建重复的已提交行?如果 executor 在调用 commit() 或 rollback() 的过程中死亡会发生什么?
try {
...
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
case e: SQLException =>
...
throw e
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
...
}
}
【问题讨论】:
标签: scala apache-spark jdbc apache-spark-sql