使用公钥/私钥,您需要生成证书
https://community.snowflake.com/s/article/How-to-connect-snowflake-with-Spark-connector-using-Public-Private-Key
然后你就可以使用下面的代码了。
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config("spark.jars", "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.repl.local.jars",
"<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
with open("<path/to/>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")
sfOptions = {
"sfURL": "<URL>",
"sfAccount": "<ACCOUNTNAME>",
"sfUser": "<USER_NAME",
"pem_private_key": pkb,
# "sfPassword": "xxxxxxxxx",
"sfDatabase": "<DBNAME>",
"sfSchema": "<SCHEMA_NAME>",
"sfWarehouse": "<WH_NAME>",
"sfRole": "<ROLENAME>",
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "<TABLENAME>") \
.load()
df.show()