【发布时间】:2022-01-01 22:54:16
【问题描述】:
在 EMR 中,我们使用 Salesforce Bulk API 调用从 salesforce 对象中获取记录。对于其中一个 Object(TASK) 数据框,同时保存到 parquet 时出现错误。
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row$class.apply(Row.scala:163)
at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(rows.scala:166)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
val sfdcObjectSchema = StructType(
nonCompoundMetas.map(_.Name).map(
fieldName => StructField(fieldName, StringType, true)
)
)
val sfdcObjectDF = spark.read.format("com.springml.spark.salesforce").option("username", userName).
option("password", s"$sfdcPassword$sfdcToken").option("soql", retrievingSOQL).
option("version", JavaUtils.getConfigProps(runtimeEnvironment).getProperty("sfdc.api.version")).
option("sfObject", sfdcObject).option("bulk", "true").option("pkChunking", pkChunking).
option("chunkSize", checkingSize).
option("timeout", bulkTimeoutMillis.toString).option("maxCharsPerColumn", "-1").option("maxColumns", nonCompoundMetas.size.toString).
schema(sfdcObjectSchema).load()
sfdcObjectDF.na.drop("all").write.mode(SaveMode.Overwrite).parquet(s"${JavaUtils.getConfigProps(runtimeEnvironment).getProperty("etl.dataset.root")}/$accountName/$sfdcObject")
请帮助我们如何进一步调试此问题。
【问题讨论】:
-
谁能帮我们解决这个问题?
标签: scala apache-spark hadoop-yarn amazon-emr