找回密码
 立即注册
首页 业界区 安全 数据分区

数据分区

吞脚 2025-6-1 18:40:16
数据分区

MapReduce如果不设置分区数量默认只有1个reducer所有任务都交给这个reducer
如果不设置分区方法默认用哈希方法:hash(key)%R   就是对键的哈希值取模reducer数量(R)来将任务分配给reducer
设置分区数量的方法:  job.setNumReduceTasks(2);
 设置分区方法: job.setPartitionerClass(StuPartitioner.class);
  分区函数设置:例如
public class StuPartitioner extends Partitioner {//分区就是如何分配reducer,所以参数是reducer的输入NullWritable, StudentWritable
    @Override
    public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {
        //按年龄进行分区,分区条件为大于18岁和小于18岁
        if (value.getAge() >= 18) {
            return 1;
        } else {
            return 0;
        }
    }
}
本例子是将学生信息按照年龄18岁以上、18岁以下分区处理
1.png

下面是本项目详细代码
(1)学生类
package com.simple;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class StudentWritable implements Writable {//因为在mapreduce里面传的数据类型都是形如xxxWritable例如IntWritable所以这里要实现Writable接口
    private String name;
    private int age;
  public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public StudentWritable() {
    }
    public StudentWritable(String name, int age) {
        this.name = name;
        this.age = age;
    }
    @Override
    public String toString() {
        return "StudentWritable [name=" + name + ",  age=" + age + "]";
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(age);
    }
  
    @Override
    public void readFields(DataInput in) throws IOException {
        this.name = in.readUTF();
        this.age = in.readInt();
    }
}
(2)mapper类写map方法
目的:将获取来的学生信息文本转为学生对象
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StudentMapper extends Mapper {//前两个参数是mapper输入,后两个参数是mapper输出
//输入:行偏移量为key,文本为value
//输出:空为key,学生信息对象StudentWritable为value
    @Override
    protected void map(LongWritable key, Text value,Mapper.Context context)
//Mapper.Context context这个可不可以改成Context context
            throws IOException, InterruptedException {
            //以空格切分
        String stuArr[] = value.toString().split(" ");
        context.write(NullWritable.get(), new StudentWritable(stuArr[0], Integer.parseInt(stuArr[1])));
    }
}
(3)reducer类写reduce方法
目的:将学生对象转为学生信息字符串
package com.simple;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class StudentReducer extends Reducer {
    @Override
    protected void reduce(NullWritable key, Iterable iter,Reducer.Context context)
            throws IOException, InterruptedException {
        // 遍历数据
        Iterator it = iter.iterator();
        while (it.hasNext()) {
            context.write(NullWritable.get(), new Text(it.next().toString()));//每一位学生信息
        }
    }
}
(4)分区方法自定义(按照年龄18划分)
package com.simple;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class StuPartitioner extends Partitioner {
    @Override
    public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {//int numPartitions表示分区数量
        //按年龄进行分区,分区条件为大于18岁和小于18岁
        if (value.getAge() >= 18) {
            return 1;
        } else {
            return 0;
        }
    }
}
(5)主启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
public class TestStuMapReducer {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
 
        //获取一个Job实例
        Job job = Job.getInstance(conf);
 
        // 设置主类
        job.setJarByClass(TestStuMapReducer.class);
 
        // 设置Mapper类和Reducer类
        job.setMapperClass(StudentMapper.class);
        job.setReducerClass(StudentReducer.class);
        job.setPartitionerClass(StuPartitioner.class);
        job.setNumReduceTasks(2);
 
        //设置map、reduce的输出类型
//map的输出类型
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(StudentWritable.class);
//reduce的输出类型
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
 
        //设置输入输出目录或文件
        FileInputFormat.setInputPaths(job, new Path("/StuAgeCata.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
        //提交任务
        job.waitForCompletion(true);
    }
}

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册