你好,游客 登录
背景:
阅读新闻

Hadoop和Spark分别实现二次排序

[日期:2017-10-31] 来源:csdn  作者:guohecang [字体: ]

将下列数据中每个分区中的第一列顺序排列,第二列倒序排列。

Text 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 
2021
5051
5052
5053
5054
6051
6053
6052
6056
6057
7058
6061
7054
7055
7056
7057
7058
1055
8067
9043
3044
5067
5087
4077
2011
1055
2084
7045
9055
9144
7844
7632
8823
9134
5611
3323
2411


使用Hadoop

写法一,参考《Hadoop权威指南》改写:

IntPair类

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
 
package com.hadoop.mr.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;

public class IntPair implements WritableComparable<IntPair> {
   
private IntWritable first;
   
private IntWritable second;

   
public void set(IntWritable first, IntWritable second) {
       
this.first = first;
       
this.second = second;
    }
   
//注意:需要添加无参的构造方法,否则反射时会报错。
    public IntPair() {
        set(
new IntWritable(), new IntWritable());
    }
   
public IntPair(int first, int second) {
        set(
new IntWritable(first), new IntWritable(second));
    }

   
public IntPair(IntWritable first, IntWritable second) {
        set(first, second);
    }

   
public IntWritable getFirst() {
       
return first;
    }

   
public void setFirst(IntWritable first) {
       
this.first = first;
    }

   
public IntWritable getSecond() {
       
return second;
    }

   
public void setSecond(IntWritable second) {
       
this.second = second;
    }

    @Override
   
public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    @Override
   
public void readFields(DataInput inthrows IOException {
        first.readFields(
in);
        second.readFields(
in);
    }

    @Override
   
public int hashCode() {
       
return first.hashCode() * 163 + second.hashCode();
    }

    @Override
   
public boolean equals(Object o) {
       
if (o instanceof IntPair) {
            IntPair tp = (IntPair) o;
           
return first.equals(tp.first) && second.equals(tp.second);
        }
       
return false;
    }

    @Override
   
public String toString() {
       
return first + "\t" + second;
    }

    @Override
   
public int compareTo(IntPair tp) {
       
int cmp = first.compareTo(tp.first);
       
if (cmp != 0) {
           
return cmp;
        }
       
return second.compareTo(tp.second);
    }
}

Secondary类

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
 
package com.hadoop.mr.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondarySort {
   
static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
        @Override
       
protected void map(LongWritable key, Text value, Context context)
               
throws IOException, InterruptedException {
           
String[] fields = value.toString().split("\t");
           
int field1 = Integer.parseInt(fields[0]);
           
int field2 = Integer.parseInt(fields[1]); 
            context.write(
new IntPair(field1,field2), NullWritable.get());
        }
    }
   
   
static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
       
//private static final Text SEPARATOR = new Text("------------------------------------------------");
        @Override
       
protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)
               
throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

   
public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {

        @Override
       
public int getPartition(IntPair key, NullWritable value,
               
int numPartitions) {
           
return Math.abs(key.getFirst().get()) % numPartitions;
        }
       
    }
   
   
//如果不添加这个类,默认第一列和第二列都是升序排序的。这个类的作用是使第一列升序排序,第二列降序排序
    public static class KeyComparator extends WritableComparator {
       
//无参构造器必须加上,否则报错。
        protected KeyComparator() {
           
super(IntPair.class, true);
        }
        @Override
       
public int compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
           
//第一列按升序排序
            int cmp = ip1.getFirst().compareTo(ip2.getFirst());
           
if (cmp != 0) {
               
return cmp;
            }
           
//在第一列相等的情况下,第二列按倒序排序
            return -ip1.getSecond().compareTo(ip2.getSecond());
        }
    }
   
/*  public static class GroupComparator extends WritableComparator {
        //无参构造器必须加上,否则报错。
        protected GroupComparator() {
            super(IntPair.class, true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
            return ip1.getFirst().compareTo(ip2.getFirst());
        }
    }*/

   
   
//入口程序
    public static void main(String[] args) throws Exception {
        Configuration conf = 
new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SecondarySort.
class);
       
//设置Mapper的相关属性
        job.setMapperClass(TheMapper.class);
       
//当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。
        //job.setMapOutputKeyClass(IntPair.class);
        //job.setMapOutputValueClass(NullWritable.class);
   
        FileInputFormat.setInputPaths(job, 
new Path(args[0]));
       
       
//设置分区的相关属性
        job.setPartitionerClass(FirstPartitioner.class);
       
//在map中对key进行排序
        job.setSortComparatorClass(KeyComparator.class);
       
//job.setGroupingComparatorClass(GroupComparator.class);
        //设置Reducer的相关属性
        job.setReducerClass(TheReducer.class);
        job.setOutputKeyClass(IntPair.
class);
        job.setOutputValueClass(NullWritable.
class);
        FileOutputFormat.setOutputPath(job, 
new Path(args[1]));
       
//设置Reducer数量
        int reduceNum = 1;
       
if(args.length >= 3 && args[2] != null){
            reduceNum = Integer.parseInt(args[
2]);
        }
        job.setNumReduceTasks(reduceNum);
        job.waitForCompletion(true);
    }
   
}

打成secsort.jar包,从hdfs上的/test/secsortdata获取数据文件,mapreduce输出目录是/test/secsortresult8,启动1个reduce:

hadoop jar secsort.jar /test/secsortdata /test/secsortresult8 1

测试结果:

可以发现第一列(key)是顺序排列的,对于相同key的values,是倒序排列的。

如果使用两个reduce会怎样?

hadoop jar secsort.jar /test/secsortdata /test/secsortresult9 2

测试结果:

那如果将代码中的GroupComparator的注释以及第100行的注释去掉,结果会怎么样呢?


如上图,它只会输出每个key中的第一个value值。

 

写法二,参考自http://www.superwu.cn/2013/08/18/492/:

代码:

IntPair类可以改写成:

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
 
package com.hadoop.mr.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class IntPair implements WritableComparable<IntPair> {
   
private int first = 0;
   
private int second = 0;

   
public void set(int first, int second) {
       
this.first = first;
       
this.second = second;
    }

   
// 注意:需要添加无参的构造方法,否则反射时会报错。
    public IntPair() {

    }

   
public IntPair(int first, int second) {
        set(first, second);
    }

   
public int getFirst() {
       
return first;
    }

   
public void setFirst(int first) {
       
this.first = first;
    }

   
public int getSecond() {
       
return second;
    }

   
public void setSecond(int second) {
       
this.second = second;
    }

    @Override
   
public void write(DataOutput out) throws IOException {
        out.write(first);
        out.write(second);
    }

    @Override
   
public void readFields(DataInput inthrows IOException {
        first = 
in.readInt();
        second = 
in.readInt();
    }

    @Override
   
public int hashCode() {
       
return first + "".hashCode() + second + "".hashCode();
    }

    @Override
   
public boolean equals(Object right) {
       
if (right instanceof IntPair) {
            IntPair r = (IntPair) right;
           
return r.getFirst() == first && r.getSecond() == second;
        } 
else {
           
return false;
        }
    }

   
// 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
    @Override
   
public int compareTo(IntPair o) {
       
if (first != o.getFirst()) {
           
return first - o.getFirst();
        } 
else if (second != o.getSecond()) {
           
return o.getSecond() - second;
        } 
else {
           
return 0;
        }
    }
}

Secondary类可以改写成:

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
 
package com.hadoop.mr.sort;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondarySort {
   
static class TheMapper extends
            Mapper<LongWritable, Text, IntPair, IntWritable> {
       
private final IntPair outKey = new IntPair();
       
private final IntWritable outValue = new IntWritable();

        @Override
       
protected void map(LongWritable key, Text value, Context context)
               
throws IOException, InterruptedException {
           
// 默认以” \t\n\r\f”(前有一个空格,引号不是)为分割符
            StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
           
int first = 0;
           
int second = 0;
           
if (itr.hasMoreTokens()) {
                first = Integer.parseInt(itr.nextToken());
               
if (itr.hasMoreTokens()) {
                    second = Integer.parseInt(itr.nextToken());
                }
                outKey.set(first, second);
                outValue.set(second);
                context.write(outKey, outValue);
            }
        }
    }

   
static class TheReducer extends
            Reducer<IntPair, IntWritable, Text, IntWritable> {
       
private static final Text SEPARATOR = new Text("------------------------------------------------");
       
private final Text first = new Text();
        @Override
       
protected void reduce(IntPair inKey, Iterable<IntWritable> inValues, Context context)
               
throws IOException, InterruptedException {
            first.set(Integer.toString(inKey.getFirst()));
           
for(IntWritable value: inValues) {
              context.write(first, value);
            }
            context.write(SEPARATOR, null);
        }

    }

   
public static class FirstPartitioner extends
            Partitioner<IntPair, IntWritable> {
        @Override
       
public int getPartition(IntPair key, IntWritable value,int numPartitions) {
           
return Math.abs(key.getFirst()* 127) % numPartitions;
        }
    }

   
/**
     * 在分组比较的时候,只比较原来的key,而不是组合key。
     */

   
public static class GroupComparator implements RawComparator<IntPair> {
      @Override
     
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8);
      }
      @Override
     
public int compare(IntPair o1, IntPair o2) {
       
int first1 = o1.getFirst();
       
int first2 = o2.getFirst();
       
return first1 - first2;
      }
    }
   
   
// 入口程序
    public static void main(String[] args) throws Exception {
        Configuration conf = 
new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SecondarySort.
class);
       
// 设置Mapper的相关属性
        job.setMapperClass(TheMapper.class);
       
// 当Mapper中的输出的key和value的类型和Reduce输出的key和value的类型相同时,以下两句可以省略。
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.
class);

        FileInputFormat.setInputPaths(job, 
new Path(args[0]));
       
// 设置分区的相关属性
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(GroupComparator.
class);
       
// 设置Reducer的相关属性
        job.setReducerClass(TheReducer.class);
        job.setOutputKeyClass(Text.
class);
        job.setOutputValueClass(IntWritable.
class);
        FileOutputFormat.setOutputPath(job, 
new Path(args[1]));
       
// 设置Reducer数量
        int reduceNum = 1;
       
if (args.length >= 3 && args[2] != null) {
            reduceNum = Integer.parseInt(args[
2]);
        }
        job.setNumReduceTasks(reduceNum);
        job.waitForCompletion(true);
    }
}

测试结果:

 hdfs dfs -cat /test/secsortresult18/part-r-*

 Text  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
 
10      55
10      55
------------------------------------------------
20      84
20      21
20      11
------------------------------------------------
24      11
------------------------------------------------
30      44
------------------------------------------------
40      77
------------------------------------------------
50      87
50      67
50      54
50      53
50      52
50      51
------------------------------------------------
56      11
------------------------------------------------
60      61
60      57
60      56
60      53
60      52
60      51
------------------------------------------------
70      58
70      58
70      57
70      56
70      55
70      54
70      45
------------------------------------------------
76      32
------------------------------------------------
78      44
------------------------------------------------
80      67
------------------------------------------------
88      23
------------------------------------------------
90      55
90      43
------------------------------------------------
33      23
------------------------------------------------
91      44
91      34
------------------------------------------------

 

使用Spark来实现二次排序

 Scala Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 
package com.spark.secondApp
import org.apache.spark.{SparkContext, SparkConf}

object SecondarySort {
 
def main(args: Array[String]) {
    val conf = 
new SparkConf().setAppName(" Secondary Sort ").setMaster("local")
    val sc = 
new SparkContext(conf)
    val file = sc.textFile(
"hdfs://worker02:9000/test/secsortdata")
    val rdd = file.map(line => line.split(
"\t")).
      map(x => (x(
0),x(1))).groupByKey().
      sortByKey(true).map(x => (x._1,x._2.toList.sortWith(_>_)))
    val rdd2 = rdd.flatMap{
      x =>
      val len = x._2.length
      val array = 
new Array[(String,String)](len)
     
for(i <- 0 until len) {
        array(i) = (x._1,x._2(i))
      }
      array 
    }
    sc.stop()
  }
}

将8~12行复制到spark-shell中执行后,再使用rdd2.collect,结果如下:

上图中第一列升序排列,第二列降序排列。

Hadoop实现二次排序需要近200行代码,而Spark只需要20多行代码。

收藏 推荐 打印 | 阅读: