首先在数据块上安装 ADLS。为此,请遵循以下代码 sn-p
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<application-id>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="<scope-name>",key="<service-credential-key-name>"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"}
# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
mount_point = "/mnt/<mount-name>",
extra_configs = configs)
你可以关注这个link
现在将存储在 ADLS 中的表转换为数据框。在这里您可以使用数据进行转换。
最后一步是将dataframe中的数据存储到SQL DW。
使用以下代码建立连接。
jdbcHostname = "xxxxxxxxxxxx.database.windows.net"
jdbcPort = "1433"
jdbcDatabase = "xxxxxxxxdb"
properties = {
"user" : "xxxxxxxx",
"password" : "******" }
下面我展示了如何将数据加载到 sql。
from pyspark.sql import *
import pandas as pd
df = DataFrameWriter(mydf)
df.jdbc(url=url, table= "Table_Name", mode ="overwrite", properties = properties)
有关更多信息,您可以参考 Gauri Mahajan 的 article