【问题标题】:COLLECT_SET() in Hive, keep duplicates?Hive 中的 COLLECT_SET(),保留重复项?
【发布时间】:2011-09-20 16:20:37
【问题描述】:

有没有办法将重复项保存在 Hive 的收集集中,或者使用其他方法模拟 Hive 提供的那种聚合集合?我想将列中具有相同键的所有项目聚合到一个数组中,并带有重复项。

IE:

hash_id | num_of_cats
=====================
ad3jkfk            4
ad3jkfk            4
ad3jkfk            2
fkjh43f            1
fkjh43f            8
fkjh43f            8
rjkhd93            7
rjkhd93            4
rjkhd93            7

应该返回:

hash_agg | cats_aggregate
===========================
ad3jkfk   Array<int>(4,4,2)
fkjh43f   Array<int>(1,8,8)
rjkhd93   Array<int>(7,4,7)

【问题讨论】:

  • 如果不清楚:请告诉我。我仍在努力解决这个问题:(

标签: java hadoop user-defined-functions hive


【解决方案1】:

从 hive 0.13 开始,有一个名为 collect_list() 的内置 UDAF 可以实现此目的。见here

【讨论】:

【解决方案2】:

只是想知道 - 如果 n 声明 -

SELECT
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set
FROM
    tablename
WHERE
    blablabla
GROUP BY
    hash_id
;

我们想要对 num_of_cats 的元素进行排序和限制 - 怎么做? COZ 在大数据中,我们处理 PB 的数据。在这种情况下,我们可能不需要所有这些,但前 10 名或限制它。

【讨论】:

  • 好的先生 - 只是我没有积分可以添加评论 - 下次会尽量保持系统性。
【解决方案3】:

在 Hive 0.13.0 之后尝试使用 COLLECT_LIST(col)

SELECT
    hash_id, COLLECT_LIST(num_of_cats) AS aggr_set
FROM
    tablename
WHERE
    blablabla
GROUP BY
    hash_id
;

【讨论】:

【解决方案4】:

收集结构的解决方法

假设你有一张桌子

tableWithStruct(
id string,
obj struct <a:string,b:string>)

现在创建另一个表

CREATE EXTERNAL TABLE tablename (
id string,
temp array<string>
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|'

插入查询

insert into table tablename select id,collect(concat_ws('|',cast(obj.a as string),cast(obj.b as string)) from tableWithStruct group by id;

现在在与 tablename

相同的位置创建另一个表
CREATE EXTERNAL TABLE tablename_final (
id string,
array_list array<struct<a:string,b:string>>
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|'

当您从 tablename_final 中选择时,您将获得所需的输出

【讨论】:

    【解决方案5】:

    查看 Brickhouse 收集 UDAF (http://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/collect/CollectUDAF.java)

    它还支持收集成地图。 Brickhouse 还包含许多有用的 UDF,它们不在标准 Hive 发行版中。

    【讨论】:

      【解决方案6】:

      对于它的价值(尽管我知道这是一篇较旧的帖子),Hive 0.13.0 具有一个新的 collect_list() 函数,它不会重复数据删除。

      【讨论】:

      • 你能解释一下这个函数吗?通常这种长度的内容最好用作对答案的评论(不幸的是,您不能这样做,因为您没有足够的代表发表评论)。
      【解决方案7】:

      这是完成这项工作的确切 hive 查询(仅适用于 hive > 0.13):

      SELECT hash_id, collect_set(num_of_cats) FROM GROUP BY hash_id;

      【讨论】:

        【解决方案8】:

        修改了 Jeff Mc 的代码以删除输入必须是原始类型的限制(可能继承自 collect_set)。这个版本可以收集结构、映射和数组以及原语。

        package com.example;
        
        import java.util.ArrayList;
        import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
        import org.apache.hadoop.hive.ql.metadata.HiveException;
        import org.apache.hadoop.hive.ql.parse.SemanticException;
        import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
        import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
        import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
        import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
        import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
        import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
        import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
        
        public class CollectAll extends AbstractGenericUDAFResolver
        {
            @Override
            public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)
                    throws SemanticException
            {
                if (tis.length != 1)
                {
                    throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");
                }
                return new CollectAllEvaluator();
            }
        
            public static class CollectAllEvaluator extends GenericUDAFEvaluator
            {
                private ObjectInspector inputOI;
                private StandardListObjectInspector loi;
                private StandardListObjectInspector internalMergeOI;
        
                @Override
                public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                        throws HiveException
                {
                    super.init(m, parameters);
                    if (m == Mode.PARTIAL1)
                    {
                        inputOI = parameters[0];
                        return ObjectInspectorFactory
                                .getStandardListObjectInspector(ObjectInspectorUtils
                                .getStandardObjectInspector(inputOI));
                    }
                    else
                    {
                        if (!(parameters[0] instanceof StandardListObjectInspector))
                        {
                            inputOI = ObjectInspectorUtils
                                    .getStandardObjectInspector(parameters[0]);
                            return (StandardListObjectInspector) ObjectInspectorFactory
                                    .getStandardListObjectInspector(inputOI);
                        }
                        else
                        {
                            internalMergeOI = (StandardListObjectInspector) parameters[0];
                            inputOI = internalMergeOI.getListElementObjectInspector();
                            loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
                            return loi;
                        }
                    }
                }
        
                static class ArrayAggregationBuffer implements AggregationBuffer
                {
                    ArrayList<Object> container;
                }
        
                @Override
                public void reset(AggregationBuffer ab)
                        throws HiveException
                {
                    ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();
                }
        
                @Override
                public AggregationBuffer getNewAggregationBuffer()
                        throws HiveException
                {
                    ArrayAggregationBuffer ret = new ArrayAggregationBuffer();
                    reset(ret);
                    return ret;
                }
        
                @Override
                public void iterate(AggregationBuffer ab, Object[] parameters)
                        throws HiveException
                {
                    assert (parameters.length == 1);
                    Object p = parameters[0];
                    if (p != null)
                    {
                        ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                        agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));
                    }
                }
        
                @Override
                public Object terminatePartial(AggregationBuffer ab)
                        throws HiveException
                {
                    ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                    ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
                    ret.addAll(agg.container);
                    return ret;
                }
        
                @Override
                public void merge(AggregationBuffer ab, Object o)
                        throws HiveException
                {
                    ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                    ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);
                    for(Object i : partial)
                    {
                        agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));
                    }
                }
        
                @Override
                public Object terminate(AggregationBuffer ab)
                        throws HiveException
                {
                    ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                    ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
                    ret.addAll(agg.container);
                    return ret;
                }
            }
        }
        

        【讨论】:

        • 这可能是版本控制问题,但我只是尝试安装到我们的 repo 并编译,但是当它在 hive 中被调用时,它会出现以下错误:Diagnostic Messages for this Task: Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc ...
        【解决方案9】:

        没有任何内置功能,但创建用户定义的函数(包括聚合)并没有那么糟糕。唯一粗略的部分是试图使它们类型通用,但这里是一个收集示例。

        package com.example;
        
        import java.util.ArrayList;
        import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
        import org.apache.hadoop.hive.ql.metadata.HiveException;
        import org.apache.hadoop.hive.ql.parse.SemanticException;
        import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
        import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
        import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
        import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
        import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
        import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
        import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
        import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
        
        public class CollectAll extends AbstractGenericUDAFResolver
        {
            @Override
            public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)
                    throws SemanticException
            {
                if (tis.length != 1)
                {
                    throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");
                }
                if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE)
                {
                    throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1.");
                }
                return new CollectAllEvaluator();
            }
        
            public static class CollectAllEvaluator extends GenericUDAFEvaluator
            {
                private PrimitiveObjectInspector inputOI;
                private StandardListObjectInspector loi;
                private StandardListObjectInspector internalMergeOI;
        
                @Override
                public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                        throws HiveException
                {
                    super.init(m, parameters);
                    if (m == Mode.PARTIAL1)
                    {
                        inputOI = (PrimitiveObjectInspector) parameters[0];
                        return ObjectInspectorFactory
                                .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils
                                .getStandardObjectInspector(inputOI));
                    }
                    else
                    {
                        if (!(parameters[0] instanceof StandardListObjectInspector))
                        {
                            inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils
                                    .getStandardObjectInspector(parameters[0]);
                            return (StandardListObjectInspector) ObjectInspectorFactory
                                    .getStandardListObjectInspector(inputOI);
                        }
                        else
                        {
                            internalMergeOI = (StandardListObjectInspector) parameters[0];
                            inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
                            loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
                            return loi;
                        }
                    }
                }
        
                static class ArrayAggregationBuffer implements AggregationBuffer
                {
                    ArrayList<Object> container;
                }
        
                @Override
                public void reset(AggregationBuffer ab)
                        throws HiveException
                {
                    ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();
                }
        
                @Override
                public AggregationBuffer getNewAggregationBuffer()
                        throws HiveException
                {
                    ArrayAggregationBuffer ret = new ArrayAggregationBuffer();
                    reset(ret);
                    return ret;
                }
        
                @Override
                public void iterate(AggregationBuffer ab, Object[] parameters)
                        throws HiveException
                {
                    assert (parameters.length == 1);
                    Object p = parameters[0];
                    if (p != null)
                    {
                        ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                        agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));
                    }
                }
        
                @Override
                public Object terminatePartial(AggregationBuffer ab)
                        throws HiveException
                {
                    ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                    ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
                    ret.addAll(agg.container);
                    return ret;
                }
        
                @Override
                public void merge(AggregationBuffer ab, Object o)
                        throws HiveException
                {
                    ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                    ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);
                    for(Object i : partial)
                    {
                        agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));
                    }
                }
        
                @Override
                public Object terminate(AggregationBuffer ab)
                        throws HiveException
                {
                    ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
                    ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
                    ret.addAll(agg.container);
                    return ret;
                }
            }
        }
        

        然后在 hive 中,发出 add jar Whatever.jar;CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll'; 您应该能够按预期使用它。

        hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id;
        OK
        ad3jkfk [4,4,2]
        fkjh43f [1,8,8]
        rjkhd93 [7,4,7]
        

        值得注意的是,元素的顺序应该被认为是未定义的,因此如果您打算使用它来将信息输入 n_grams,您可能需要稍微扩展它以根据需要对数据进行排序。

        【讨论】:

        • 不错的答案 :) 我最终尝试了这个并且遇到了一些问题。查看您的代码,我发现我做错了什么(类型通用 is 很难),我认为这会解决的。
        猜你喜欢
        • 2011-09-19
        • 1970-01-01
        • 2017-02-12
        • 1970-01-01
        • 1970-01-01
        • 2015-12-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多