【发布时间】:2021-02-03 10:28:06
【问题描述】:
>我已使用 Pyspark 代码将我已使用 JDBC 转换为数据帧的 HDFS 数据集中的内容复制到 teradata 表
`# -*- coding: utf-8 -*-
import dataiku
from dataiku import spark as dkuspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *
import datetime
from pyspark.sql.functions import col,when
from pyspark.sql import DataFrame
from functools import reduce
from datetime import date,datetime
import pandas as pd
#import psycopg2
import sys
from datetime import timedelta
import array
import numpy as np
from pyspark.sql import functions as F
import teradata
import teradatasql
import sys
from collections import OrderedDict
import pprint as pp
import logging
import logging.handlers
import smtplib
import datetime
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
**# Teradata connection details**
user = dataiku.get_custom_variables()['DB_TERADATA_BA_USER']
password = dataiku.get_custom_variables()['DB_TERADATA_BA_USER_PWD']
teradata_server = dataiku.get_custom_variables()['Teradata_server']
**# Connect to Teradata**
tera_con = teradatasql.connect(host=teradata_server, user=user, password=password)
tera_cur = tera_con.cursor()
print("connection to teradata successful")
driver = dataiku.get_custom_variables()['DB_TERADATA_DRIVER']
auditdburl = "jdbc:teradata://"+teradata_server+"/Database=DBName"
#LOGMECH=TD2"
***# Read recipe inputs***
PVS_OP_10052020_1 = dataiku.Dataset("310_PVS_OP_10052020_1")
PVS_OP_10052020_1_df = dkuspark.get_dataframe(sqlContext, PVS_OP_10052020_1)
# Compute recipe outputs from inputs
# TODO: Replace this part by your actual code that computes the output, as a SparkSQL dataframe
bac_NCCT_310_PVS_OP_Test_POC_test1_df = PVS_OP_10052020_1_df # For this sample code, simply copy
input to output
bac_NCCT_310_PVS_OP_Test_POC_test1_df.write.format("jdbc")\
.option("driver",driver)\
.option("url",auditdburl)\
.option("dbtable",'BAC_NCCT_310_PVS_OP_Test_POC_test3')\
.option("user",user)\
.option("password",password)\
.option('TYPE','FASTEXPORT')\
.mode('append')\
.save()`
~运行代码时出现以下错误 Job failed: Pyspark code failed: At line 74:
: An error occurred while calling o96.save.当我刚接触 Pyspark 时,任何人都可以帮助找出我做错的地方吗~
【问题讨论】:
-
您能否针对您遇到的问题添加更多背景信息?此外,您的代码格式不正确。
-
运行上述代码时出现以下错误:“作业失败:Pyspark 代码失败:第 74 行:
:调用 o96 时出错。保存” -
查看日志:[2020/10/20-15:45:37.793] [null-err-119] [INFO] [dku.utils] - log4j:ERROR setFile(null, true) 调用失败。 [2020/10/20-15:45:37.793] [null-err-119] [INFO] [dku.utils] - java.io.FileNotFoundException: /var/log/spark/user/u111437/stderr (没有这个文件或目录)[2020/10/20-15:45:37.794] [null-err-119] [INFO] [dku.utils] - at java.io.FileOutputStream.open0(Native Method)
标签: python jdbc pyspark hdfs teradata