【问题标题】:Hive cluster by vs order by vs sort byHive cluster by vs order by vs sort by
【发布时间】:2012-11-22 19:05:32
【问题描述】:

据我所知;

  • sort by 仅在 reducer 中排序

  • order by 在全球范围内订购东西,但将所有东西都塞到一个减速器中

  • cluster by 智能地通过 key hash 将内容分配到 reducer 中并进行排序

所以我的问题是集群是否保证全局顺序?分发方式将相同的键放入相同的减速器中,但是相邻的键呢?

我能找到的唯一文档是here,从这个例子看来,它似乎是在全球范围内订购它们。但从定义上看,我觉得它并不总是那样。

【问题讨论】:

    标签: hadoop hql hive


    【解决方案1】:

    更简短的回答:是的,CLUSTER BY 保证全局排序,前提是您愿意自己加入多个输出文件。

    更长的版本:

    • ORDER BY x:保证全局排序,但通过仅通过一个 reducer 推送所有数据来做到这一点。对于大型数据集,这基本上是不可接受的。您最终会得到一个已排序的文件作为输出。
    • SORT BY x:在 N 个 reducer 中的每一个上排序数据,但每个 reducer 可以接收重叠范围的数据。您最终会得到 N 个或更多具有重叠范围的排序文件。
    • DISTRIBUTE BY x:确保 N 个减速器中的每一个都获得 x 的非重叠范围,但不对每个减速器的输出进行排序。您最终会得到 N 个或更多范围不重叠的未排序文件。
    • CLUSTER BY x:确保 N 个减速器中的每一个都获得不重叠的范围,然后在减速器处按这些范围排序。这为您提供了全局排序,并且与(DISTRIBUTE BY xSORT BY x)相同。您最终会得到 N 个或更多具有非重叠范围的排序文件。

    有意义吗?所以CLUSTER BY 基本上是ORDER BY 的可扩展版本。

    【讨论】:

    • 正如其他答案所述,根据cwiki.apache.org/confluence/display/Hive/LanguageManual+SortByCLUSTER BYDISTRIBUTE BY 不能给你不重叠的范围。 CLUSTER BY 无法保证全局排序。
    • 我想知道...什么被认为是“大型数据集”?你能量化一下吗?
    • 我的查询为 SORT BYCLUSTER BY 返回了相同的不想要的东西:减速器中的本地排序。我不得不求助于ORDER BY 并等待整个周末直到工作完成。
    • CLUSTER BY 使用集群列的哈希 mod 减速器数量来确保具有相同列值的行进入同一个减速器 - 就是这样,没有比这更强大的保证了!请参阅我的答案,其中包含示例和订单保留哈希等链接。
    • 我的想法也和@yhuai一样。 lars-yencken,你能提供任何参考吗?
    【解决方案2】:

    首先让我澄清一下:clustered by 仅将您的密钥分配到不同的存储桶中,clustered by ... sorted by 将存储桶排序。

    通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局订单。原因是默认分区器使用哈希码拆分键,而不考虑实际的键顺序。

    但是,您可以让您的数据完全有序。

    动机是 Tom White 的“Hadoop:权威指南”(第 3 版,第 8 章,第 274 页,Total Sort),他在其中讨论了 TotalOrderPartitioner。

    我将首先回答您的 TotalOrdering 问题,然后描述我所做的几个与排序相关的 Hive 实验。

    请记住:我在这里描述的是“概念证明”,我能够使用 Claudera 的 CDH3 发行版处理一个示例。

    最初我希望 org.apache.hadoop.mapred.lib.TotalOrderPartitioner 可以解决问题。不幸的是,它没有,因为它看起来像 Hive 的值分区,而不是键。所以我修补它(应该有子类,但我没有时间):

    替换

    public int getPartition(K key, V value, int numPartitions) {
        return partitions.findPartition(key);
    }
    

    public int getPartition(K key, V value, int numPartitions) {
        return partitions.findPartition(value);
    }
    

    现在您可以将 TotalOrderPartitioner 设置为(已修补)作为您的 Hive 分区器:

    hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
    
    hive> set total.order.partitioner.natural.order=false
    
    hive> set total.order.partitioner.path=/user/yevgen/out_data2
    

    我也用过

    hive> set hive.enforce.bucketing = true; 
    
    hive> set mapred.reduce.tasks=4;
    

    在我的测试中。

    文件 out_data2 告诉 TotalOrderPartitioner 如何存储值。 您通过对数据进行采样来生成 out_data2。在我的测试中,我使用了 4 个桶和从 0 到 10 的密钥。我使用 ad-hoc 方法生成了 out_data2:

    import org.apache.hadoop.util.ToolRunner;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.hive.ql.io.HiveKey;
    import org.apache.hadoop.fs.FileSystem;
    
    
    public class TotalPartitioner extends Configured implements Tool{
        public static void main(String[] args) throws Exception{
                ToolRunner.run(new TotalPartitioner(), args);
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Path partFile = new Path("/home/yevgen/out_data2");
            FileSystem fs = FileSystem.getLocal(getConf());
    
            HiveKey key = new HiveKey();
            NullWritable value = NullWritable.get();
    
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
            key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
            writer.append(key, value);
            key.set( new byte[]{1, 6}, 0, 2);//partition at 6
            writer.append(key, value);
            key.set( new byte[]{1, 9}, 0, 2);//partition at 9
            writer.append(key, value);
            writer.close();
            return 0;
        }
    
    }
    

    然后我将生成的 out_data2 复制到 HDFS(到 /user/yevgen/out_data2)

    通过这些设置,我对数据进行了分桶/排序(请参阅我的实验列表中的最后一项)。

    这是我的实验。

    • 创建样本数据

      bash> echo -e "1\n3\n2\n4\n5\n7\n6\n8\n9\n0" > data.txt

    • 创建基本测试表:

      hive> 创建表 test(x int); hive> 将数据本地 inpath 'data.txt' 加载到表测试中;

    这个表基本上包含从 0 到 9 的值,没有顺序。

    • 演示表复制是如何工作的(真正的 mapred.reduce.tasks 参数设置了要使用的最大减少任务数)

      hive> 创建表 test2(x int);

      hive> 设置 mapred.reduce.tasks=4;

      hive> 插入覆盖表 test2 从测试 a 中选择 a.x 加入测试b 在 a.x=b.x 上; -- stupied join 强制非平凡的 map-reduce

      bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0

      1

      5

      9

    • 演示分桶。您可以看到键是随机分配的,没有任何排序顺序:

      hive> 创建表 test3(x int) 由 (x) 聚类成 4 个桶;

      hive> 设置 hive.enforce.bucketing = true;

      hive> 插入覆盖表 test3 从测试中选择 *;

      bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0

      4

      8

      0

    • 带有排序的分桶。结果是部分排序的,不是完全排序的

      hive> 创建表 test4(x int) 按 (x) 聚类 按 (x desc) 排序 分成 4 个桶;

      hive> 插入覆盖表 test4 从测试中选择 *;

      bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0

      1

      5

      9

    您可以看到值是按升序排序的。看起来像 CDH3 中的 Hive 错误?

    • 在没有集群的情况下通过语句进行部分排序:

      hive> 创建表 test5 为 选择 x 从测试 按 x 分配 按 x desc 排序;

      bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0

      9

      5

      1

    • 使用我打过补丁的 TotalOrderParitioner:

      hive> 设置 hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

      hive> 设置 total.order.partitioner.natural.order=false

      hive> 设置 total.order.partitioner.path=/user/training/out_data2

      hive> 创建表 test6(x int) 按 (x) 聚类,按 (x) 排序,分成 4 个桶;

      hive> 插入覆盖表 test6 从测试中选择 *;

      bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0

      1

      2

      0

      bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0

      3

      4

      5

      bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0

      7

      6

      8

      bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0

      9

    【讨论】:

      【解决方案3】:

      CLUSTER BY 不会产生全局排序。

      接受的答案(由 Lars Yencken 撰写)通过声明减速器将接收非重叠范围来误导。正如 Anton Zaviriukhin 正确指向 BucketedTables 文档一样,CLUSTER BY 基本上是 DISTRIBUTE BY(与 bucketing 相同)加上每个 bucket/reducer 中的 SORT BY。并且 DISTRIBUTE BY 只是简单地将哈希和 mods 放入存储桶中,而哈希函数 may 保留顺序(i 的哈希 > j 的哈希,如果 i > j),哈希值的 mod 不会。

      这是显示重叠范围的更好示例

      http://myitlearnings.com/bucketing-in-hive/

      【讨论】:

      • 我同意你的观点,即使是 hive 文档也没有提到'distribute by'进行全局排序。
      【解决方案4】:

      据我了解,简短的回答是否定的。 你会得到重叠的范围。

      来自SortBy documentation: “Cluster By 是 Distribute By 和 Sort By 的捷径。” “具有相同 Distribute By 列的所有行都将进入同一个 reducer。” 但是没有保证不重叠范围分布的信息。

      此外,来自DDL BucketedTables documentation: “Hive 如何在存储桶中分配行?通常,存储桶编号由表达式 hash_function(bucketing_column) mod num_buckets 确定。” 我认为 Select 语句中的 Cluster by 使用相同的原则在 reducer 之间分配行,因为它的主要用途是用数据填充分桶表。

      我创建了一个包含 1 个整数列“a”的表,并在其中插入了从 0 到 9 的数字。

      然后我将减速器的数量设置为 2 set mapred.reduce.tasks = 2;.

      select 来自该表的数据与Cluster by 子句 select * from my_tab cluster by a;

      并收到了我预期的结果:

      0
      2
      4
      6
      8
      1
      3
      5
      7
      9
      

      所以,第一个 reducer(数字 0)得到了偶数(因为它们的模式 2 给出了 0)

      第二个减速器(编号 1)得到奇数(因为它们的模式 2 给出 1)

      这就是“分发者”的工作原理。

      然后“排序依据”对每个reducer内部的结果进行排序。

      【讨论】:

        【解决方案5】:

        用例:当有一个大数据集时,应该像 sort by 那样进行 sort by ,所有 set reducer 在合并之前在内部对数据进行排序,从而提高性能。在按顺序排列时,较大数据集的性能会降低,因为所有数据都通过单个减速器传递,这会增加负载并因此需要更长的时间来执行查询。 请参见下面关于 11 节点集群的示例。

        这个是Order By example output

        这是Sort By示例输出

        这个是Cluster By example

        我观察到,sort by、cluster by和distribute by的数字是SAME,但内部机制不同。在 DISTRIBUTE BY 中:相同的列行将转到一个 reducer,例如。 DISTRIBUTE BY(City) - 一列中的班加罗尔数据,一个减速器中的德里数据:

        【讨论】:

          【解决方案6】:

          Cluster by 是每个 reducer 排序的,不是全局的。在许多书中,它也被错误地或混淆地提及。它有特殊用途,例如您将每个部门分配给特定的减速器,然后按每个部门中的员工姓名排序,并且不关心要使用的集群的部门顺序,并且由于工作负载分布在减速器之间,它的性能更高.

          【讨论】:

          • 如果你在distribute by后使用collect_set或collect_list,会保持顺序吗?
          【解决方案7】:

          SortBy:N个或更多具有重叠范围的排序文件。

          OrderBy:单输出,即完全排序。

          Distribute By:Distribute By 保护 N 个 reducer 中的每一个,获得列的非重叠范围,但不对每个 reducer 的输出进行排序。

          更多信息http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/

          ClusterBy:参考上面同一个例子,如果我们使用Cluster By x,两个reducer会进一步对x上的行进行排序:

          【讨论】:

            【解决方案8】:

            如果我理解正确的话

            1.sort by - 只对reducer内的数据进行排序

            2.order by - 通过将整个数据集推送到单个 reducer 来全局排序。如果我们确实有很多数据(倾斜),这个过程会花费很多时间。

            1. cluster by - 通过 key hash 智能地将内容分配到 reducer 并进行排序,但不授予全局排序。一个 key(k1) 可以放入两个 reducer。第一个 reducer 获得 10K K1 数据,第二个可能获得 1K k1 数据。

            【讨论】:

            • 您的所有观点都已详细包含在已接受的答案中
            猜你喜欢
            • 2019-01-27
            • 2015-02-26
            • 2019-05-03
            • 2015-10-07
            • 2013-05-11
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多