【问题标题】:IN Apache Beam how to handle exceptions/errors at Pipeline-IO levelIN Apache Beam 如何在 Pipeline-IO 级别处理异常/错误
【发布时间】:2019-04-05 12:31:07
【问题描述】:

我在 Apache Beam 中使用正在运行的 spark runner 作为管道 runner,发现一个错误。 通过得到错误,我提出了问题。我知道错误是由于 sql 查询中的 Column_name 不正确,但我的问题是如何处理 IO 级别的错误/异常

org.apache.beam.sdk.util.UserCodeException: java.sql.SQLSyntaxErrorException: Unknown column 'FIRST_NAME' in 'field list'
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:70)
at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:145)
at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
18/11/01 13:13:16 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, localhost, executor driver): org.apache.beam.sdk.util.UserCodeException: java.sql.SQLSyntaxErrorException: Unknown column 'FIRST_NAME' in 'field list'
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
    at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:70)
    at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:145)
    at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
    at org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    ..............
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'FIRST_NAME' in 'field list'
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:536)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:115)
    at com.mysql.cj.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:1983)
    at com.mysql.cj.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1826)
    at com.mysql.cj.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1923)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:83)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:601)

【问题讨论】:

  • 我提到的上述链接,它仅在 pardo 转换中说明,即仅在流程元素级别,如果在连接器级别捕获异常,即 管道 IO 级别
  • 等待回复,请帮助我

标签: apache-spark jdbc apache-beam apache-beam-io


【解决方案1】:

您必须创建一个自定义异常处理程序类来捕获该异常,例如;

需要实现这样的自定义方法

public Mycust_Exception(String string) {
    super("Error Obtained by "+string);
}

这里我刚刚返回了字符串,但也可以使用 super() 抛出,现在您需要声明您希望出现异常的 try-catch 块并遵循 PTranformation_level_exceptionHandler_implementation

在 catch 块中像这样调用 throw 语句

throw new Ezflow_Exception("Invalid statement");

这个实现肯定可以满足您的大部分查询。 对于 Java 编程,它是最常见的实现方式之一

【讨论】:

    猜你喜欢
    • 2021-12-04
    • 2021-09-13
    • 2022-08-16
    • 2020-06-15
    • 1970-01-01
    • 2019-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多