【问题标题】:problem with write from spark structured streaming to oracle table从 spark 结构化流写入 oracle 表的问题
【发布时间】:2021-10-14 14:46:52
【问题描述】:

我使用 readStream 读取目录中的文件并处理文件,最后我有一个数据框,我想将它写入 oracle 表。我使用 jdbc 驱动程序和 foreachbach() api.here 我的代码:

def SaveToOracle(df,epoch_id):
try:
    df.write.format('jdbc').options(
          url='jdbc:oracle:thin:@192.168.49.8:1521:ORCL',
          driver='oracle.jdbc.driver.OracleDriver',
          dbtable='spark.result_table',
          user='spark',
          password='spark').mode('append').save()
    pass
except Exception as e:
    response = e.__str__()
    print(response)

streamingQuery = (summaryDF4.writeStream
  .outputMode("append")
  .foreachBatch(SaveToOracle)
  .start()
                 )

作业失败且没有任何错误,并在开始查询流式传输后停止。控制台日志是这样的:

2021-08-11 10:45:11,003 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
2021-08-11 10:45:11,003 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
2021-08-11 10:45:11,007 INFO streaming.MicroBatchExecution: Starting new streaming query.
2021-08-11 10:45:11,009 INFO cluster.YarnClientSchedulerBackend: YARN client scheduler backend Stopped
2021-08-11 10:45:11,011 INFO streaming.MicroBatchExecution: Stream started from {}
2021-08-11 10:45:11,021 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2021-08-11 10:45:11,034 INFO memory.MemoryStore: MemoryStore cleared
2021-08-11 10:45:11,034 INFO storage.BlockManager: BlockManager stopped
2021-08-11 10:45:11,042 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2021-08-11 10:45:11,046 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2021-08-11 10:45:11,053 INFO spark.SparkContext: Successfully stopped SparkContext
2021-08-11 10:45:11,056 INFO util.ShutdownHookManager: Shutdown hook called
2021-08-11 10:45:11,056 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-bf5c7539-9d1f-4c9d-af46-0c0874a81a40/pyspark-7416fc8a-18bd-4e79-aa0f-ea673e7c5cd8
2021-08-11 10:45:11,060 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-47c28d1d-236c-4b64-bc66-d07a918abe01
2021-08-11 10:45:11,063 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-bf5c7539-9d1f-4c9d-af46-0c0874a81a40
2021-08-11 10:45:11,065 INFO util.ShutdownHookManager: Deleting directory /tmp/temporary-f9420356-164a-4806-abb2-f132b8026b20

有什么问题,我怎样才能得到正确的日志? 这是我的 sparkSession 配置:

conf = SparkConf()
conf.set("spark.jars", "/home/hadoop/ojdbc6.jar")

spark=(SparkSession
       .builder
       .config(conf=conf)
       .master("yarn")
       .appName("Test010")
       .getOrCreate()
      )

更新:

我在 jdbc save() 上遇到错误,这里是:

An error occurred while calling o379.save.
: java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:102)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:217)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:221)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)

【问题讨论】:

    标签: python-3.x apache-spark jdbc pyspark spark-structured-streaming


    【解决方案1】:

    在 start() 方法调用之后,您需要在 streamingQuery 上调用 awaitTermination() 方法,如下所示:

    streamingQuery = (summaryDF4.writeStream
      .outputMode("append")
      .foreachBatch(SaveToOracle)
      .start()
      .awaitTermination())
    

    【讨论】:

      【解决方案2】:

      感谢您的回答,它使流媒体工作保持活力,但仍然无法正常工作

        "id" : "3dc2a37f-4a7a-49b9-aa83-bff526aa14c5",
        "runId" : "e8864901-2729-41c5-b7e4-a19ac2478f1c",
        "name" : null,
        "timestamp" : "2021-08-11T06:51:00.000Z",
        "batchId" : 2,
        "numInputRows" : 0,
        "inputRowsPerSecond" : 0.0,
        "processedRowsPerSecond" : 0.0,
        "durationMs" : {
          "addBatch" : 240,
          "getBatch" : 66,
          "latestOffset" : 21,
          "queryPlanning" : 143,
          "triggerExecution" : 514,
          "walCommit" : 23
        },
        "eventTime" : {
          "watermark" : "1970-01-01T00:00:00.000Z"
        },
        "stateOperators" : [ {
          "numRowsTotal" : 0,
          "numRowsUpdated" : 0,
          "memoryUsedBytes" : -1,
          "numRowsDroppedByWatermark" : 0,
          "customMetrics" : {
            "loadedMapCacheHitCount" : 0,
            "loadedMapCacheMissCount" : 0,
            "stateOnCurrentVersionSizeBytes" : -1
          }
        } ],
        "sources" : [ {
          "description" : "FileStreamSource[hdfs://192.168.49.13:9000/input/lz]",
          "startOffset" : {
            "logOffset" : 1
          },
          "endOffset" : {
            "logOffset" : 2
          },
          "numInputRows" : 0,
          "inputRowsPerSecond" : 0.0,
          "processedRowsPerSecond" : 0.0
        } ],
        "sink" : {
          "description" : "ForeachBatchSink",
          "numOutputRows" : -1
        }
      }
      

      如果我使用控制台,我的进程没有任何问题,它会是正确的,但写入 oracle 我有问题。

      【讨论】:

      • 您应该将此作为更新添加到您的原始帖子中,不要添加到答案部分或作为评论添加到我的答案中。
      猜你喜欢
      • 2020-02-20
      • 1970-01-01
      • 1970-01-01
      • 2018-08-19
      • 2019-10-26
      • 2018-10-06
      • 2019-01-19
      • 1970-01-01
      • 2019-08-01
      相关资源
      最近更新 更多