数据分区
MapReduce如果不设置分区数量默认只有1个reducer所有任务都交给这个reducer
如果不设置分区方法默认用哈希方法:hash(key)%R 就是对键的哈希值取模reducer数量(R)来将任务分配给reducer
设置分区数量的方法: job.setNumReduceTasks(2);
设置分区方法: job.setPartitionerClass(StuPartitioner.class);
分区函数设置:例如
public class StuPartitioner extends Partitioner {//分区就是如何分配reducer,所以参数是reducer的输入NullWritable, StudentWritable
@Override
public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {
//按年龄进行分区,分区条件为大于18岁和小于18岁
if (value.getAge() >= 18) {
return 1;
} else {
return 0;
}
}
}
本例子是将学生信息按照年龄18岁以上、18岁以下分区处理
下面是本项目详细代码
(1)学生类
package com.simple;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class StudentWritable implements Writable {//因为在mapreduce里面传的数据类型都是形如xxxWritable例如IntWritable所以这里要实现Writable接口
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public StudentWritable() {
}
public StudentWritable(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "StudentWritable [name=" + name + ", age=" + age + "]";
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.age = in.readInt();
}
}
(2)mapper类写map方法
目的:将获取来的学生信息文本转为学生对象
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StudentMapper extends Mapper {//前两个参数是mapper输入,后两个参数是mapper输出
//输入:行偏移量为key,文本为value
//输出:空为key,学生信息对象StudentWritable为value
@Override
protected void map(LongWritable key, Text value,Mapper.Context context)
//Mapper.Context context这个可不可以改成Context context
throws IOException, InterruptedException {
//以空格切分
String stuArr[] = value.toString().split(" ");
context.write(NullWritable.get(), new StudentWritable(stuArr[0], Integer.parseInt(stuArr[1])));
}
}
(3)reducer类写reduce方法
目的:将学生对象转为学生信息字符串
package com.simple;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class StudentReducer extends Reducer {
@Override
protected void reduce(NullWritable key, Iterable iter,Reducer.Context context)
throws IOException, InterruptedException {
// 遍历数据
Iterator it = iter.iterator();
while (it.hasNext()) {
context.write(NullWritable.get(), new Text(it.next().toString()));//每一位学生信息
}
}
}
(4)分区方法自定义(按照年龄18划分)
package com.simple;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class StuPartitioner extends Partitioner {
@Override
public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {//int numPartitions表示分区数量
//按年龄进行分区,分区条件为大于18岁和小于18岁
if (value.getAge() >= 18) {
return 1;
} else {
return 0;
}
}
}
(5)主启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestStuMapReducer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
//获取一个Job实例
Job job = Job.getInstance(conf);
// 设置主类
job.setJarByClass(TestStuMapReducer.class);
// 设置Mapper类和Reducer类
job.setMapperClass(StudentMapper.class);
job.setReducerClass(StudentReducer.class);
job.setPartitionerClass(StuPartitioner.class);
job.setNumReduceTasks(2);
//设置map、reduce的输出类型
//map的输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(StudentWritable.class);
//reduce的输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//设置输入输出目录或文件
FileInputFormat.setInputPaths(job, new Path("/StuAgeCata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
//提交任务
job.waitForCompletion(true);
}
}
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |