【发布时间】:2021-10-14 14:46:52
【问题描述】:
我使用 readStream 读取目录中的文件并处理文件,最后我有一个数据框,我想将它写入 oracle 表。我使用 jdbc 驱动程序和 foreachbach() api.here 我的代码:
def SaveToOracle(df,epoch_id):
try:
df.write.format('jdbc').options(
url='jdbc:oracle:thin:@192.168.49.8:1521:ORCL',
driver='oracle.jdbc.driver.OracleDriver',
dbtable='spark.result_table',
user='spark',
password='spark').mode('append').save()
pass
except Exception as e:
response = e.__str__()
print(response)
streamingQuery = (summaryDF4.writeStream
.outputMode("append")
.foreachBatch(SaveToOracle)
.start()
)
作业失败且没有任何错误,并在开始查询流式传输后停止。控制台日志是这样的:
2021-08-11 10:45:11,003 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
2021-08-11 10:45:11,003 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
2021-08-11 10:45:11,007 INFO streaming.MicroBatchExecution: Starting new streaming query.
2021-08-11 10:45:11,009 INFO cluster.YarnClientSchedulerBackend: YARN client scheduler backend Stopped
2021-08-11 10:45:11,011 INFO streaming.MicroBatchExecution: Stream started from {}
2021-08-11 10:45:11,021 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2021-08-11 10:45:11,034 INFO memory.MemoryStore: MemoryStore cleared
2021-08-11 10:45:11,034 INFO storage.BlockManager: BlockManager stopped
2021-08-11 10:45:11,042 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2021-08-11 10:45:11,046 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2021-08-11 10:45:11,053 INFO spark.SparkContext: Successfully stopped SparkContext
2021-08-11 10:45:11,056 INFO util.ShutdownHookManager: Shutdown hook called
2021-08-11 10:45:11,056 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-bf5c7539-9d1f-4c9d-af46-0c0874a81a40/pyspark-7416fc8a-18bd-4e79-aa0f-ea673e7c5cd8
2021-08-11 10:45:11,060 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-47c28d1d-236c-4b64-bc66-d07a918abe01
2021-08-11 10:45:11,063 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-bf5c7539-9d1f-4c9d-af46-0c0874a81a40
2021-08-11 10:45:11,065 INFO util.ShutdownHookManager: Deleting directory /tmp/temporary-f9420356-164a-4806-abb2-f132b8026b20
有什么问题,我怎样才能得到正确的日志? 这是我的 sparkSession 配置:
conf = SparkConf()
conf.set("spark.jars", "/home/hadoop/ojdbc6.jar")
spark=(SparkSession
.builder
.config(conf=conf)
.master("yarn")
.appName("Test010")
.getOrCreate()
)
更新:
我在 jdbc save() 上遇到错误,这里是:
An error occurred while calling o379.save.
: java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
【问题讨论】:
标签: python-3.x apache-spark jdbc pyspark spark-structured-streaming