> 文章列表 > 【大数据之Hadoop】十六、MapReduce之Join

【大数据之Hadoop】十六、MapReduce之Join

【大数据之Hadoop】十六、MapReduce之Join

1 Reduce Join

Map端:
为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端:
在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并。

需求:
【大数据之Hadoop】十六、MapReduce之Join
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

创建商品和订单合并后的TableBean类

package com.study.mapreduce.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable {private String id;private String pid;private int amount;private String name;private String flag;public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(id);dataOutput.writeUTF(pid);dataOutput.writeInt(amount);dataOutput.writeUTF(name);dataOutput.writeUTF(flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readUTF();this.pid = dataInput.readUTF();this.amount = dataInput.readInt();this.name = dataInput.readUTF();this.flag = dataInput.readUTF();}@Overridepublic String toString() {return id + "\\t" + amount + "\\t" + name;}
}

TableMapper类

package com.study.mapreduce.reducejoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {private String filename;private Text outK = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//获取对应文件名称,每一个文件只获取一次InputSplit split = context.getInputSplit();FileSplit fileSplit = (FileSplit) split;filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1 获取一行数据,转成字符串String line = value.toString();//2 判断是哪个文件if(filename.contains("order")){//3 切割数据String[] split = line.split(" ");//4 封装outK.set(split[1]);outV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setName("");outV.setFlag("order");}else{//3 切割数据String[] split = line.split(" ");//4 封装outK.set(split[0]);outV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setName(split[1]);outV.setFlag("pd");}//5 写出outK outVcontext.write(outK, outV);}
}

TableReducer类

package com.study.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {//reduce方法是相同的key调用一次 key是pid,两个表中根据pid取order的id和amount,取pd的nameArrayList<TableBean> orderBeans = new ArrayList<>();   //订单有多行TableBean pdBean = new TableBean();  //只有一行for (TableBean value : values) {if("order".equals(value.getFlag())){//创建一个临时TableBean对象接收value//此处创建一个临时对象的原因://因为bean直接add是传递地址,每次循环都创建一个新的对象,赋予新的地址,再加入到集合中TableBean tmpOrderBean = new TableBean();try {//工具类,把value的值暂时放到临时对象中,以免迭代中被覆盖BeanUtils.copyProperties(tmpOrderBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}//将临时TableBean对象添加到集合orderBeansorderBeans.add(tmpOrderBean);}else{try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出for (TableBean orderBean : orderBeans) {orderBean.setName(pdBean.getName());   //合并//写出修改后的orderBean对象context.write(orderBean,NullWritable.get());}}
}

TableDriver类

package com.study.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 获取job对象Configuration conf = new Configuration(true);Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(TableDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);//6 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\\\tableinput"));FileOutputFormat.setOutputPath(job, new Path("D:\\\\tableoutput"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

在Reduce阶段极易产生数据倾斜,原因是大量数据在Reduce阶段进行合并处理过多的表,Map阶段负载低,资源利用率不高。

解决:在Map阶段进行数据合并操作,缓存多张表,提前处理业务逻辑,增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

2 Map Join

Map Join适用于一张表很小(小表缓存到内存中)、一张表很大的场景(大表用于逐行遍历)。

根据上述例子进行修改:
先将小表缓存到内存中,放入集合;逐行遍历大表的行,根据大表的关键字映射到集合中,然后取集合元素和大表元素进行拼接封装再写出。

步骤:
1.在驱动Driver中加载缓存数据,且Map阶段的join不需要Reduce阶段,所以需要设置ReduceTask数量为0。

2.在Mapper类中重写setup()和map()方法。

setup()中(处理小表):
(1)获取缓存的文件。
(2)循环读取文件的一行。
(3)切割。
(4)缓存数据到集合。
(5)关流。

map()中(处理大表以及拼接):
(1)获取一行。
(2)切割。
(3)获取pid。
(4)获取订单id和商品名称。
(5)拼接封装。
(6)写出。

MapJoinDriver类

package com.study.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {//1 获取job对象Configuration conf = new Configuration(true);Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(MapJoinDriver.class);//3 关联Mapperjob.setMapperClass(MapJoinMapper.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//6 加载缓存数据job.addCacheFile(new URI("file:///D:/tablecache/pd.txt"));//7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);//8 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\\\tableinput"));FileOutputFormat.setOutputPath(job, new Path("D:\\\\tableoutput"));//9 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

MapJoinMapper类

package com.study.mapreduce.mapjoin;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {//集合private Map<String, String> pdMap = new HashMap<>();private Text outv = new Text();//任务开始前将pd数据缓存进pdMap@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//1 通过缓存文件得到小表数据pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));//2 逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {//3 切割一行//01   小米String[] split = line.split(" ");//4 缓存到集合重(pid,name)pdMap.put(split[0], split[1]);}//5 关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1 获取一行数据,转成字符串String line = value.toString();//2 切割String[] split = line.split(" ");//3 封装String name = pdMap.get(split[1]);outv.set(split[0]+" "+name+" "+split[2]);//写出context.write(outv,NullWritable.get());}
}