【发布时间】:2023-02-12 08:06:19
【问题描述】:
我正在搜索 PySpark 代码以使用 Airflow Connection 查询 MySQL。我看过许多解释使用 JDBC 连接器进行 MySQL 连接的文章。有什么方法可以使用 Airflow 连接并在 PySpark Dataframe 中加载 MySQL 数据。
【问题讨论】:
我正在搜索 PySpark 代码以使用 Airflow Connection 查询 MySQL。我看过许多解释使用 JDBC 连接器进行 MySQL 连接的文章。有什么方法可以使用 Airflow 连接并在 PySpark Dataframe 中加载 MySQL 数据。
【问题讨论】:
Airflow 提供 SparkSubmitOperator 以将 spark 作业提交到 spark 集群,并提供 SparkJDBCOperator 扩展第一个运算符并仅添加来自 JDBC 连接的凭据作为 spark 作业的参数 (source code)。您可以使用第一个通用的,并自行添加参数以匹配您的 spark 作业中的参数名称(如果它已经存在)。
如果您的问题是关于处理来自SparkJDBCOperator 或通用参数的参数,这里有一个例子:
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--user", "-u", type=str, required=True)
parser.add_argument("--password", "-p", type=str, required=True)
parser.add_argument("--driver", "-d", type=str, required=True)
parser.add_argument("--url", "-l", type=str, required=True)
parser.add_argument("--table", "-t", type=str, required=True)
args = vars(parser.parse_args())
spark = (
SparkSession.builder
.appName("spark jdbc exemple")
.config("other conf", "conf value...")
.getOrCreate()
)
df = (
spark.read
.format("jdbc")
.option("driver", args["driver"])
.option("url", args["url"])
.option("dbtable", args["table"])
.option("user", args["user"])
.option("password", args["password"])
.load()
)
【讨论】: