初识hadoop及map-reduce

搭建hadoop环境

hadoop环境搭建具体可以参考官方文档

搭建配置maven

map-reduce任务支持多种语言,但对java支持是最好的,所以这里说一下怎么搭建java的编译环境。

首先编译安装maven,并将MAVEN_HOME/bin加入PATH环境变量,这样就可以直接使用mvn命令了。这里说一下怎么利用maven编译生成我们后续示例中的jar包。

1. 使用maven新建一个工程

下面的命令创建一个包含java类org.myorg.WordCount的工程WordCount.

1
mvn archetype:create -DgroupId=org.myorg -DartifactId=WordCount

工程结构如图:

1
2
3
4
5
6
7
8
9
10
11
12
13
WordCount
├── pom.xml
└── src
    ├── main
    │   └── java
    │       └── org
    │           └── myorg
    │               └── App.java
    └── test
        └── java
            └── org
                └── myorg
                    └── AppTest.java

WordCount/src/main/java/org/myorg/App.java重命名为WordCount/src/main/java/org/myorg/WordCount.java,并将示例代码复制进去,代码的细节稍后再看。

由于java类中依赖于hadoop的java包,所以在maven的配置文件pom.xml标签对<dependencies/>内添加java类文件里引用的依赖:

WordCount/pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-mapreduce-client-core</artifactId>
  <version>2.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-mapreduce-client-common</artifactId>
  <version>2.4.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
  <version>2.4.1</version>
</dependency>

2.编译生成jar包

在WordCount根目录下执行:

1
mvn package

就生成了我们需要的WordCount/target/WordCount-1.0-SNAPSHOT.jar文件。

执行示例程序WordCount

示例程序是一个单词计数程序,输入文件有两个:

1
2
3
4
5
6
7
file01
=======================
Hello World Bye World

file02
=======================
Hello Hadoop Goodbye Hadoop

### 1.上传数据文件

1
2
3
4
5
6
7
8
#创建目录
hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/hadoop
#上传文件
hdfs dfs -put file01 /user/hadoop/input
hdfs dfs -put file02 /user/hadoop/input
#查看文件是否上传上去了
hdfs dfs -ls /user/hadoop/input

2.提交并执行map-reduce任务

1
hadoop jar WordCount-1.0-SNAPSHOT.jar org.myorg.WordCount /user/hadoop/input /user/hadoop/output

3.获取结果

当任务执行完毕在输出目录会生成_SUCCESS文件:

1
2
3
4
hdfs dfs -ls /user/hadoop/output
#输出是:
-rw-r--r--   1 hadoop supergroup          0 2014-09-03 20:20 /user/hadoop/output/_SUCCESS
-rw-r--r--   1 hadoop supergroup         41 2014-09-03 20:20 /user/hadoop/output/part-00000

查看结果:

1
2
3
4
5
6
7
hdfs dfs -cat /user/hadoop/output/part-00000
#输出:
Bye	1
Goodbye	1
Hadoop	2
Hello	2
World	2

Map-Reduce

回过头来再看执行map-reduce的这个java类WordCount.java,该类包含了两个静态内部类MapReduce,都继承了MapReduceBase基类,并各自实现了MapperReducer接口。

WordCount/src/main/java/org/myorg/WordCount.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {
//执行map操作的静态类
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
	    String line = value.toString();
	    StringTokenizer tokenizer = new StringTokenizer(line);
	    while (tokenizer.hasMoreTokens()) {
		word.set(tokenizer.nextToken());
		//OutputCollector以单词本身为键,出现次数为键值进行计数,这里每出现一次计数1
		output.collect(word, one);
	    }
	}
    }
//执行reduce操作的静态类
    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
	    //map后的结果是同一个key对应一个value的列表,所以这里遍历values迭代器,累加所有值,即得到每个单词计数值
	    int sum = 0;
	    while (values.hasNext()) {
		sum += values.next().get();
	    }
	    output.collect(key, new IntWritable(sum));
	}
    }
    public static void main(String[] args) throws Exception {
	JobConf conf = new JobConf(WordCount.class);
	conf.setJobName("wordcount");

	conf.setOutputKeyClass(Text.class);
	conf.setOutputValueClass(IntWritable.class);

	conf.setMapperClass(Map.class);
	conf.setCombinerClass(Reduce.class);
	conf.setReducerClass(Reduce.class);

	conf.setInputFormat(TextInputFormat.class);
	conf.setOutputFormat(TextOutputFormat.class);

	FileInputFormat.setInputPaths(conf, new Path(args[0]));
	FileOutputFormat.setOutputPath(conf, new Path(args[1]));

	JobClient.runJob(conf);
    }
}

Mapper接口是一个泛型接口,该接口4个参数分别指定了map方法的输入键值,输入值,输出键值,输出值类型。 类似的Reducer接口也是个泛型接口,它的前两个参数和map的后两个参数类型对应,从而也间接决定了后两个参数的类型。

简而言之,map的过程是把一行行的输入变成:

key1 => val1

key2 => val2

key3 => val1

而reduce的输入是排序过后map的输出:

key1 => [val1,val…..]

key2 => [val2,val…..]

reduce的操作就是把这个输入合并成我们想要的东西。

最后,WordCount类的main方法里设置输入输出,然后执行任务。

以streaming方式执行map-reduce任务

通常来说,简单的map-reduce任务还是用脚本来写比较快,比如ruby,python或者linux shell,这里使用bash来重写一次这个单词计数。

1. map程序

hadoop的streaming是流式处理,即上一操作的输入作为下一操作的输出,基本可以等价用管道来看:

1
cat data-file | mapper.sh | sort | reducer.sh

输入输出都是走的标准输入输出,所以改写的map程序非常简单:

map.sh map操作
1
2
#!/bin/bash
awk '{for(i=1;i<=NF;i++) print $i" 1"}'

2. reduce程序

类似的重写reduce:

reduce.sh reduce操作
1
2
#!/bin/bash
awk '{arr[$1]+=1}END{for(k in arr) print k" "arr[k]}'

3. 提交streaming任务

提交streaming类型的任务需要指定一个额外的jar包$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar,还要在命令里指出map和recude的脚本

1
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar  -input '/user/hadoop/input/*' -output '/user/hadoop/output1' -mapper map.sh -reducer reduce.sh

任务执行的结果和之前是一致的:

1
2
3
4
5
6
7
hdfs dfs -cat /user/hadoop/output1/part-00000
#输出:
Hadoop 2
Goodbye 1
Bye 1
Hello 2
World 2

Comments