【问题标题】:Using Pyspark to Copy Data from Spark Data frame to Tera data Table via JDBC使用 Pyspark 通过 JDBC 将数据从 Spark 数据帧复制到 Tera 数据表
【发布时间】:2021-02-03 10:28:06
【问题描述】:

>我已使用 Pyspark 代码将我已使用 JDBC 转换为数据帧的 HDFS 数据集中的内容复制到 teradata 表

`# -*- coding: utf-8 -*-
import dataiku
from dataiku import spark as dkuspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *
import datetime
from pyspark.sql.functions import col,when
from pyspark.sql import DataFrame
from functools import reduce
from datetime import date,datetime
import pandas  as pd
#import psycopg2
import sys
from datetime import timedelta
import array
import numpy as np
from pyspark.sql import functions as F
import teradata
import teradatasql
import sys
from collections import OrderedDict
import pprint as pp
import logging
import logging.handlers
import smtplib
import datetime
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
**# Teradata connection details**
user = dataiku.get_custom_variables()['DB_TERADATA_BA_USER']
password = dataiku.get_custom_variables()['DB_TERADATA_BA_USER_PWD']
teradata_server = dataiku.get_custom_variables()['Teradata_server']

**# Connect to Teradata**
tera_con = teradatasql.connect(host=teradata_server, user=user, password=password)
tera_cur = tera_con.cursor()
print("connection to teradata successful")

driver = dataiku.get_custom_variables()['DB_TERADATA_DRIVER']
auditdburl = "jdbc:teradata://"+teradata_server+"/Database=DBName"

#LOGMECH=TD2"
***# Read recipe inputs***
PVS_OP_10052020_1 = dataiku.Dataset("310_PVS_OP_10052020_1")
PVS_OP_10052020_1_df = dkuspark.get_dataframe(sqlContext, PVS_OP_10052020_1)

 # Compute recipe outputs from inputs
 # TODO: Replace this part by your actual code that computes the output, as a SparkSQL dataframe
   bac_NCCT_310_PVS_OP_Test_POC_test1_df = PVS_OP_10052020_1_df  # For this sample code, simply copy 
   input to output


 bac_NCCT_310_PVS_OP_Test_POC_test1_df.write.format("jdbc")\
.option("driver",driver)\
.option("url",auditdburl)\
.option("dbtable",'BAC_NCCT_310_PVS_OP_Test_POC_test3')\
.option("user",user)\
.option("password",password)\
.option('TYPE','FASTEXPORT')\
.mode('append')\
.save()`

~运行代码时出现以下错误 Job failed: Pyspark code failed: At line 74: : An error occurred while calling o96.save.当我刚接触 Pyspark 时,任何人都可以帮助找出我做错的地方吗~

【问题讨论】:

  • 您能否针对您遇到的问题添加更多背景信息?此外,您的代码格式不正确。
  • 运行上述代码时出现以下错误:“作业失败:Pyspark 代码失败:第 74 行::调用 o96 时出错。保存”
  • 查看日志:[2020/10/20-15:45:37.793] [null-err-119] [INFO] [dku.utils] - log4j:ERROR setFile(null, true) 调用失败。 [2020/10/20-15:45:37.793] [null-err-119] [INFO] [dku.utils] - java.io.FileNotFoundException: /var/log/spark/user/u111437/stderr (没有这个文件或目录)[2020/10/20-15:45:37.794] [null-err-119] [INFO] [dku.utils] - at java.io.FileOutputStream.open0(Native Method)

标签: python jdbc pyspark hdfs teradata


【解决方案1】:

您是否尝试在 spark 配置中添加属性 "mapreduce.job.queuename": "long_running"。这很有效。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-11
    • 2015-05-29
    • 2018-04-22
    • 2017-06-06
    相关资源
    最近更新 更多