搭建hadoop环境
hadoop环境搭建具体可以参考官方文档。
搭建配置maven
map-reduce任务支持多种语言,但对java支持是最好的,所以这里说一下怎么搭建java的编译环境。
首先编译安装maven,并将MAVEN_HOME/bin
加入PATH环境变量,这样就可以直接使用mvn
命令了。这里说一下怎么利用maven编译生成我们后续示例中的jar包。
1. 使用maven新建一个工程
下面的命令创建一个包含java类org.myorg.WordCount
的工程WordCount
.
1
| mvn archetype:create -DgroupId=org.myorg -DartifactId=WordCount
|
工程结构如图:
1
2
3
4
5
6
7
8
9
10
11
12
13
| WordCount
├── pom.xml
└── src
├── main
│ └── java
│ └── org
│ └── myorg
│ └── App.java
└── test
└── java
└── org
└── myorg
└── AppTest.java
|
将WordCount/src/main/java/org/myorg/App.java
重命名为WordCount/src/main/java/org/myorg/WordCount.java
,并将示例代码复制进去,代码的细节稍后再看。
由于java类中依赖于hadoop的java包,所以在maven的配置文件pom.xml
标签对<dependencies/>
内添加java类文件里引用的依赖:
WordCount/pom.xml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.4.1</version>
</dependency>
|
2.编译生成jar包
在WordCount根目录下执行:
就生成了我们需要的WordCount/target/WordCount-1.0-SNAPSHOT.jar
文件。
执行示例程序WordCount
示例程序是一个单词计数程序,输入文件有两个:
1
2
3
4
5
6
7
| file01
=======================
Hello World Bye World
file02
=======================
Hello Hadoop Goodbye Hadoop
|
### 1.上传数据文件
1
2
3
4
5
6
7
8
| #创建目录
hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/hadoop
#上传文件
hdfs dfs -put file01 /user/hadoop/input
hdfs dfs -put file02 /user/hadoop/input
#查看文件是否上传上去了
hdfs dfs -ls /user/hadoop/input
|
2.提交并执行map-reduce任务
1
| hadoop jar WordCount-1.0-SNAPSHOT.jar org.myorg.WordCount /user/hadoop/input /user/hadoop/output
|
3.获取结果
当任务执行完毕在输出目录会生成_SUCCESS文件:
1
2
3
4
| hdfs dfs -ls /user/hadoop/output
#输出是:
-rw-r--r-- 1 hadoop supergroup 0 2014-09-03 20:20 /user/hadoop/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 41 2014-09-03 20:20 /user/hadoop/output/part-00000
|
查看结果:
1
2
3
4
5
6
7
| hdfs dfs -cat /user/hadoop/output/part-00000
#输出:
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
|
Map-Reduce
回过头来再看执行map-reduce的这个java类WordCount.java
,该类包含了两个静态内部类Map
和Reduce
,都继承了MapReduceBase
基类,并各自实现了Mapper
和Reducer
接口。
WordCount/src/main/java/org/myorg/WordCount.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
//执行map操作的静态类
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
//OutputCollector以单词本身为键,出现次数为键值进行计数,这里每出现一次计数1
output.collect(word, one);
}
}
}
//执行reduce操作的静态类
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
//map后的结果是同一个key对应一个value的列表,所以这里遍历values迭代器,累加所有值,即得到每个单词计数值
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
|
Mapper
接口是一个泛型接口,该接口4个参数分别指定了map方法的输入键值,输入值,输出键值,输出值
类型。 类似的Reducer
接口也是个泛型接口,它的前两个参数和map的后两个参数类型对应,从而也间接决定了后两个参数的类型。
简而言之,map的过程是把一行行的输入变成:
key1 => val1
key2 => val2
key3 => val1
而reduce的输入是排序过后map的输出:
key1 => [val1,val…..]
key2 => [val2,val…..]
…
reduce的操作就是把这个输入合并成我们想要的东西。
最后,WordCount
类的main
方法里设置输入输出,然后执行任务。
以streaming方式执行map-reduce任务
通常来说,简单的map-reduce任务还是用脚本来写比较快,比如ruby,python或者linux shell,这里使用bash来重写一次这个单词计数。
1. map程序
hadoop的streaming是流式处理,即上一操作的输入作为下一操作的输出,基本可以等价用管道来看:
1
| cat data-file | mapper.sh | sort | reducer.sh
|
输入输出都是走的标准输入输出,所以改写的map程序非常简单:
map.sh map操作1
2
| #!/bin/bash
awk '{for(i=1;i<=NF;i++) print $i" 1"}'
|
2. reduce程序
类似的重写reduce:
reduce.sh reduce操作1
2
| #!/bin/bash
awk '{arr[$1]+=1}END{for(k in arr) print k" "arr[k]}'
|
3. 提交streaming任务
提交streaming类型的任务需要指定一个额外的jar包$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar
,还要在命令里指出map和recude的脚本
1
| hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.4.1.jar -input '/user/hadoop/input/*' -output '/user/hadoop/output1' -mapper map.sh -reducer reduce.sh
|
任务执行的结果和之前是一致的:
1
2
3
4
5
6
7
| hdfs dfs -cat /user/hadoop/output1/part-00000
#输出:
Hadoop 2
Goodbye 1
Bye 1
Hello 2
World 2
|