【问题标题】:PySpark code to query MySQL using Airflow Connection使用 Airflow Connection 查询 MySQL 的 PySpark 代码
【发布时间】:2023-02-12 08:06:19
【问题描述】:

我正在搜索 PySpark 代码以使用 Airflow Connection 查询 MySQL。我看过许多解释使用 JDBC 连接器进行 MySQL 连接的文章。有什么方法可以使用 Airflow 连接并在 PySpark Dataframe 中加载 MySQL 数据。

【问题讨论】:

    标签: mysql pyspark airflow


    【解决方案1】:

    Airflow 提供 SparkSubmitOperator 以将 spark 作业提交到 spark 集群,并提供 SparkJDBCOperator 扩展第一个运算符并仅添加来自 JDBC 连接的凭据作为 spark 作业的参数 (source code)。您可以使用第一个通用的,并自行添加参数以匹配您的 spark 作业中的参数名称(如果它已经存在)。

    如果您的问题是关于处理来自SparkJDBCOperator 或通用参数的参数,这里有一个例子:

    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument("--user", "-u", type=str, required=True)
        parser.add_argument("--password", "-p", type=str, required=True)
        parser.add_argument("--driver", "-d", type=str, required=True)
        parser.add_argument("--url", "-l", type=str, required=True)
        parser.add_argument("--table", "-t", type=str, required=True)
        args = vars(parser.parse_args())
    
        spark = (
            SparkSession.builder
                .appName("spark jdbc exemple")
                .config("other conf", "conf value...")
                .getOrCreate()
        )
    
       df = (
            spark.read
                .format("jdbc")
                .option("driver", args["driver"])
                .option("url", args["url"])
                .option("dbtable", args["table"])
                .option("user", args["user"])
                .option("password", args["password"])
                .load()
        )
    

    【讨论】:

      猜你喜欢
      • 2019-11-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-11-22
      • 2020-10-11
      • 1970-01-01
      • 1970-01-01
      • 2012-12-24
      相关资源
      最近更新 更多