首先让我澄清一下:clustered by 仅将您的密钥分配到不同的存储桶中,clustered by ... sorted by 将存储桶排序。
通过一个简单的实验(见下文),您可以看到默认情况下您不会获得全局订单。原因是默认分区器使用哈希码拆分键,而不考虑实际的键顺序。
但是,您可以让您的数据完全有序。
动机是 Tom White 的“Hadoop:权威指南”(第 3 版,第 8 章,第 274 页,Total Sort),他在其中讨论了 TotalOrderPartitioner。
我将首先回答您的 TotalOrdering 问题,然后描述我所做的几个与排序相关的 Hive 实验。
请记住:我在这里描述的是“概念证明”,我能够使用 Claudera 的 CDH3 发行版处理一个示例。
最初我希望 org.apache.hadoop.mapred.lib.TotalOrderPartitioner 可以解决问题。不幸的是,它没有,因为它看起来像 Hive 的值分区,而不是键。所以我修补它(应该有子类,但我没有时间):
替换
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(key);
}
与
public int getPartition(K key, V value, int numPartitions) {
return partitions.findPartition(value);
}
现在您可以将 TotalOrderPartitioner 设置为(已修补)作为您的 Hive 分区器:
hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
hive> set total.order.partitioner.natural.order=false
hive> set total.order.partitioner.path=/user/yevgen/out_data2
我也用过
hive> set hive.enforce.bucketing = true;
hive> set mapred.reduce.tasks=4;
在我的测试中。
文件 out_data2 告诉 TotalOrderPartitioner 如何存储值。
您通过对数据进行采样来生成 out_data2。在我的测试中,我使用了 4 个桶和从 0 到 10 的密钥。我使用 ad-hoc 方法生成了 out_data2:
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;
public class TotalPartitioner extends Configured implements Tool{
public static void main(String[] args) throws Exception{
ToolRunner.run(new TotalPartitioner(), args);
}
@Override
public int run(String[] args) throws Exception {
Path partFile = new Path("/home/yevgen/out_data2");
FileSystem fs = FileSystem.getLocal(getConf());
HiveKey key = new HiveKey();
NullWritable value = NullWritable.get();
SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
writer.append(key, value);
key.set( new byte[]{1, 6}, 0, 2);//partition at 6
writer.append(key, value);
key.set( new byte[]{1, 9}, 0, 2);//partition at 9
writer.append(key, value);
writer.close();
return 0;
}
}
然后我将生成的 out_data2 复制到 HDFS(到 /user/yevgen/out_data2)
通过这些设置,我对数据进行了分桶/排序(请参阅我的实验列表中的最后一项)。
这是我的实验。
这个表基本上包含从 0 到 9 的值,没有顺序。
-
演示表复制是如何工作的(真正的 mapred.reduce.tasks 参数设置了要使用的最大减少任务数)
hive> 创建表 test2(x int);
hive> 设置 mapred.reduce.tasks=4;
hive> 插入覆盖表 test2
从测试 a 中选择 a.x
加入测试b
在 a.x=b.x 上; -- stupied join 强制非平凡的 map-reduce
bash> hadoop fs -cat /user/hive/warehouse/test2/000001_0
1
5
9
-
演示分桶。您可以看到键是随机分配的,没有任何排序顺序:
hive> 创建表 test3(x int)
由 (x) 聚类成 4 个桶;
hive> 设置 hive.enforce.bucketing = true;
hive> 插入覆盖表 test3
从测试中选择 *;
bash> hadoop fs -cat /user/hive/warehouse/test3/000000_0
4
8
0
-
带有排序的分桶。结果是部分排序的,不是完全排序的
hive> 创建表 test4(x int)
按 (x) 聚类 按 (x desc) 排序
分成 4 个桶;
hive> 插入覆盖表 test4
从测试中选择 *;
bash> hadoop fs -cat /user/hive/warehouse/test4/000001_0
1
5
9
您可以看到值是按升序排序的。看起来像 CDH3 中的 Hive 错误?
-
在没有集群的情况下通过语句进行部分排序:
hive> 创建表 test5 为
选择 x
从测试
按 x 分配
按 x desc 排序;
bash> hadoop fs -cat /user/hive/warehouse/test5/000001_0
9
5
1
-
使用我打过补丁的 TotalOrderParitioner:
hive> 设置 hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
hive> 设置 total.order.partitioner.natural.order=false
hive> 设置 total.order.partitioner.path=/user/training/out_data2
hive> 创建表 test6(x int)
按 (x) 聚类,按 (x) 排序,分成 4 个桶;
hive> 插入覆盖表 test6
从测试中选择 *;
bash> hadoop fs -cat /user/hive/warehouse/test6/000000_0
1
2
0
bash> hadoop fs -cat /user/hive/warehouse/test6/000001_0
3
4
5
bash> hadoop fs -cat /user/hive/warehouse/test6/000002_0
7
6
8
bash> hadoop fs -cat /user/hive/warehouse/test6/000003_0
9