【问题标题】:Apache Pig UDF and outputSchema customizationApache Pig UDF 和 outputSchema 自定义
【发布时间】:2016-03-20 11:48:47
【问题描述】:

我正在尝试实现 UDF 函数来处理各种源/输入文件。输入文件因列数而异。我的意图是拥有通用的 UDF 功能。每次运行 pig 脚本都会处理一种类型的输入文件(由 '|' 分隔的相同数量的记录。

UDF 函数应该读取由分隔符 (|) 分隔的所有输入记录,并根据某些条件生成一个带有两个元组的包,例如。 输入 (1,2,3,4,5,6) 输出 a) {(1,3), (2,4,5,6)} 要么 b) {(2,3,4), (1,5,6)}

我无法扩展 outputSchema 方法来处理不同大小的元组的创建。无法将额外的参数传递给 outputSchema 方法。不能使用定义为 EvalFunc 类定义的一部分的临时变量,因为它的值在每次运行时都为空。

有什么提示吗?谢谢你

更新:

我使用 GRUNT 执行下面的命令,在“AS”之后提供了 inputSchema

sourceData = foreach sourceData generate com.pig.Data('test.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray)));

UDF 代码在这里...

public Schema outputSchema(Schema input) {

(第 233 行) System.out.println("------------------------" + input.getFields().size());

错误:

Pig Stack Trace
---------------
ERROR 1200: java.lang.NullPointerException

Failed to parse: java.lang.NullPointerException
        at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:201)
        at org.apache.pig.PigServer$Graph.validateQuery(PigServer.java:1707)
        at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1680)
        at org.apache.pig.PigServer.registerQuery(PigServer.java:623)
        at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:1082)
        at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:505)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
        at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:66)
        at org.apache.pig.Main.run(Main.java:565)
        at org.apache.pig.Main.main(Main.java:177)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
        at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:306)
        at org.apache.pig.newplan.logical.expression.UserFuncExpression.getFieldSchema(UserFuncExpression.java:244)
        at org.apache.pig.newplan.logical.optimizer.FieldSchemaResetter.execute(SchemaResetter.java:264)
        at org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor.visit(AllSameExpressionVisitor.java:143)
        at org.apache.pig.newplan.logical.expression.UserFuncExpression.accept(UserFuncExpression.java:113)
        at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70)
        at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visitAll(SchemaResetter.java:67)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:122)
        at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:245)
        at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:114)
        at org.apache.pig.parser.LogicalPlanBuilder.buildForeachOp(LogicalPlanBuilder.java:1055)
        at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15896)
        at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)

        at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
        at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
        at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
        ... 16 more
Caused by: java.lang.NullPointerException
        at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:233)
        ... 34 more
================================================================================

更新2:

好的,输入模式是从之前的 pig 命令传播的...

sourceData = load 'test.csv' using PigStorage(',') as (VIN: chararray,Birthdate: chararray, name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray);

sourceData = foreach sourceData generate com.pig.Data'test_data_desc.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage : chararray,Fuel_Consumption: chararray)));

这没有用 -( 因为它不可能传播任何额外的属性,或者它不可能在内部创建任何其他更复杂的逻辑 outputSchema 方法;-(

【问题讨论】:

    标签: java apache-pig cloudera udf


    【解决方案1】:

    在 outputSchema 函数中,您可以访问输入模式,并使用输入模式信息根据输入动态生成输出模式(如果输入以某种方式反映了预期的输出)。 示例:

      public Schema outputSchema(Schema input) {
        Schema mySchema = new Schema();
        if (input.getFields().size() == 3) {
          mySchema.add(new Schema.FieldSchema("data1", DataType.DOUBLE));
          mySchema.add(new Schema.FieldSchema("data2", DataType.DOUBLE));
          mySchema.add(new Schema.FieldSchema("data3", DataType.DOUBLE));
        } else {
          mySchema.add(new Schema.FieldSchema("data", DataType.CHARARRAY));
        }
        return mySchema;
      }
    

    我希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 2013-12-31
      • 1970-01-01
      • 1970-01-01
      • 2015-06-11
      • 2013-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多