【发布时间】:2019-12-18 11:48:25
【问题描述】:
(1) 有一个 BigQuery 源表,如 ...
column_name | is_nullable | data_type
OrderId | YES | STRING
items | NO | ARRAY<STRUCT<articleId STRING, quantity FLOAT64INT64>>
“OrderId”应该是从理性表的角度来看的键。
(2) 现在我想将 ARRAY/STRUCT 记录规范化为单独的表。 为了实现这一点,我使用了 Transform "Wrangler"。
注意:这是来自 Data Fusion 工作室传输部分的“牧马人”!当尝试通过汉堡菜单打开“Wrangler”并选择 BQ 源表时,提示:不支持 BigQuery 类型 STRUCT。
源表的输出链接到 Wrangler 的输入。
我在 Wrangler 中定义了 ...
- 输入字段名称:*
- 前提条件:假
- 指令/配方:保留 combiOrderId,items,articleId,quantity
- Output Schema (Name | Type | Null): -- (根据源表,下面附上JSON)
combiOrderId | string | yes items | array | no record [ {articleId | string | yes}, {quantity | float | yes} ]
(3) BQ 接收器表将 Wrangler 输出作为输入架构,我将最终架构定义为 (Name | Type | Null)
combiOrderId | string | yes
articleId | string | yes
quantity | float | yes
现在,当运行管道(预览模式)时,会记录以下错误消息:
转换为输出记录的问题。原因:无法解码数组 '项目'
(下面的完整消息)
非常欢迎任何提示或替代解决方案 :-)
谢谢。
Wrangler 输出模式的 JSON:
[
{
"name": "etlSchemaBody",
"schema": {
"type": "record",
"name": "etlSchemaBody",
"fields": [
{
"name": "combiOrderId",
"type": [
"string",
"null"
]
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "a6adafef5943d4757b2fad43a10732952",
"fields": [
{
"name": "articleId",
"type": [
"string",
"null"
]
},
{
"name": "quantity",
"type": [
"float",
"null"
]
}
]
}
}
}
]
}
}
]
完整(第一个)错误日志:
java.lang.Exception: Stage:Normalize-items - Reached error threshold 1, terminating processing due to error : Problem converting into output record. Reason : Unable to decode array 'items'
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:412) ~[1576661389534-0/:na]
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:94) ~[1576661389534-0/:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:50) ~[hydrator-spark-core2_2.11-6.1.0.jar:na]
at io.cdap.cdap.etl.spark.Compat$FlatMapAdapter.call(Compat.java:126) ~[hydrator-spark-core2_2.11-6.1.0.jar:na]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
Caused by: io.cdap.wrangler.api.RecipeException: Problem converting into output record. Reason : Unable to decode array 'items'
at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:102) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:384) ~[1576661389534-0/:na]
... 25 common frames omitted
Caused by: io.cdap.wrangler.utils.RecordConvertorException: Unable to decode array 'items'
at io.cdap.wrangler.utils.RecordConvertor.decodeArray(RecordConvertor.java:382) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decode(RecordConvertor.java:142) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decodeUnion(RecordConvertor.java:368) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decode(RecordConvertor.java:152) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decodeRecord(RecordConvertor.java:85) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.toStructureRecord(RecordConvertor.java:56) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:99) ~[wrangler-core-4.1.3.jar:na]
... 26 common frames omitted
【问题讨论】:
-
Torsten:检查错误可能是什么的最简单方法是从 Wrangler 导航。您可以按照以下步骤执行此操作: 1. 转到 Wrangler 连接
: /cdap/ns/default/connections 2. 单击 BQ 源(或创建 BigQuery 连接) 3. 导航到 BQ 表并点击它。 4. 这将带您进入 wrangler 工作区(选项卡式视图) 5. 从那里您可以应用所有转换并单击“创建管道” 在此之后您应该看到您的源和 wrangler 转换已经配置。然后,您可以添加接收器并运行预览以测试事情是否正常 -
解决您的另一点:Wrangler 仅支持 BQ 源中的数组类型。它不支持从 BigQuery 读取 STRUCT 类型。我的猜测就是这就是你看到这个问题的原因。 issues.cask.co/browse/CDAP-15665
标签: normalization google-cloud-data-fusion