在hadoop中默认的排序算法中,只会针对key值进行排序。当key值相同时,需要对value进行排序。
简单来说,就是在数据文件中,如果按照第一列升序排序,当第一列相同时,第二列按照升序排序。
输入样例 输出样例
本样例使用了自定义的类MyGrouptest类,对两个数的读取、写入和比较进行了定义。
在map中对key的值是默认排序的,所以将第一列的值作为key,value值任意,传到reduce中。在reduce中使用MyGrouptest类,对数值进行比较,从而得到第二列的排序。
**MyGrouptest.java
public class MyGrouptest implements WritableComparable<MyGrouptest> {
long firstNum;
long secondNum;
public MyGrouptest() {}
public MyGrouptest(long first, long second) {
firstNum = first;
secondNum = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
@Override
public void readFields(DataInput in) throws IOException {
firstNum = in.readLong();
secondNum = in.readLong();
}
/* * 当key进行排序时会调用以下这个compreTo方法 */
@Override
public int compareTo(MyGrouptest anotherKey) {
long min = firstNum - anotherKey.firstNum;
if (min != 0) { // 说明第一列不相等,则返回两数之间小的数
return (int) min;
}
else {
return (int) (secondNum - anotherKey.secondNum);
}
}
public long getFirstNum() {
return firstNum;
}
public long getSecondNum() {
return secondNum;
}
}
***Groupsort.java
public class GroupSort {
static String INPUT_PATH="hdfs://master:9000/input/f.txt";
static String OUTPUT_PATH="hdfs://master:9000/output/groupsort";
static class MyMapper extends Mapper<Object,Object,MyGrouptest,NullWritable>{
MyGrouptest output_key=new MyGrouptest();
NullWritable output_value=NullWritable.get();
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{
String[] tokens=value.toString().split(",",2);
MyGrouptest output_key=new MyGrouptest(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]));
context.write(output_key,output_value);
}
}
static class MyReduce extends Reducer<MyGrouptest,NullWritable,LongWritable,LongWritable>{
LongWritable output_key=new LongWritable();
LongWritable output_value=new LongWritable();
protected void reduce(MyGrouptest key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{
output_key.set(key.getFirstNum());
output_value.set(key.getSecondNum());
context.write(output_key,output_value);
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setNumReduceTasks(1);
job.setPartitionerClass(LiuPartitioner.class);
job.setMapOutputKeyClass(MyGrouptest.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
job.waitForCompletion(true);
}
}