MapReduce高级-读写数据库
MapReduce 读取数据库
为什么要读写数据库
本质上讲数据库是存储数据的介质,MapReduce是处理数据的计算引擎。通常企业会使用关系型数据库(RDBMS)来存储业务的相关数据,随着业务数据的规模越来越大,不可避免的存在性能下降的问题,这里存在两个说法:
- 百度: MySQL单表数据量大于2000万行,性能会明显下降
- 案例:单表行数超过500w行或者单表容量大于2G,推荐使用分库分表
此时我们可以通过使用MapReduce从MySQL中定期迁移使用频率比较低的历史数据到HDFS中:
- 一方面可以降低MySQL的存储核计算负载
- 通过分布式计算引擎可以更加高效的处理过去的历史数据
如何实现读写数据库
对于MapReduce框架来说,使用InputFormart进行读取数据,读取的数据首先由Mapper 进行处理,然后交给Reduce处理,最终使用OutputFormat进行数据的输出操作,默认情况下,输入输出的组件实现都是针对文本数据处理的,分别是TextInputFormat、TextOutputFormat。
为了方便MapReduce直接访问关系型数据库(MySQL、Oracle),Hadoop提供了DBInputFormat、DBOutputFormat两个类,其中DBInputForm负责从数据库读取数据,而DBOutputFormat负责把数据写入数据库中
使用测试
需求
在MySQL中shop数据库下的produce中的数据导出存放在指定的文件系统目录下。
那么传统的读取方式肯定不行,那么采用什么方式来读取呢?
DBInputFormat
DBInputFormat类用于从SQL表中读取数据,底层一行一行的读取表中的数据,返回<K,V>键值对,
其中K是LongWritable类型,表示表中数据的记录行号,从0开始
V是DBWritable类型,表示该行数据对应的对象类型
DBInputFormat能够读取MySQL本质上还是在底层封装了JDBC,所以在后续项目中还要加上JDBC的驱动
读取MySQL数据
DBInputFormat在底层封装了MySQL,那么在使用的过程中,就需要加上JDBC的驱动,后续为了方便,这里也加上了lombok的依赖来简化开发
POM文件整体
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.wxk</groupId><artifactId>HDFS-HDFS2Test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.1.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><configuration><archive><manifest><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix><mainClass>MapReduceTest.WordDriver</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>
编写Bean文件
在编写Bean文件的时候需要实现Writable和DBWritable这两个接口
package MapReduceTest.DB.Reader;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;/* @author wxk* @date 2023/04/19/17:39*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderBean implements Writable, DBWritable {private int id;private String order;private String time;@Overridepublic String toString() {return id + "\\t" + order + "\\t" +time;}// 序列化方法,将数据写出去@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(id);out.writeUTF(order);out.writeUTF(time);}//序列化方法,将数据读取进来@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readInt();this.order=in.readUTF();this.time= in.readUTF();}//序列化 写入数据库@Overridepublic void write(PreparedStatement ps) throws SQLException {ps.setInt(1,id);ps.setString(2,order);ps.setString(3,time);}//将查询结果赋予给此对象@Overridepublic void readFields(ResultSet resultSet) throws SQLException {this.id=resultSet.getInt(1);this.order=resultSet.getString(2);this.time=resultSet.getString(3);}
}
编写Mapper文件
在配置Mapper文件中,我们需要了解一下信息:
Mapper中的类型表示的输入输出的KV的格式:输入的K是Long类型,V是GoodsBean类型,输出的K是Long类型,V是字符串类型。这里输入的KEY是字符串类型是因为K是一个偏移量,表示当前读取的是哪一行,后续可以根据自己的想法进行设置
package MapReduceTest.DB.Reader;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/* @author wxk* @date 2023/04/19/9:53*/
public class ReaderMapper extends Mapper<LongWritable,OrderBean,LongWritable, Text> {Text out =new Text();@Overrideprotected void map(LongWritable key, OrderBean value, Context context) throws IOException, InterruptedException {out.set(value.toString());context.write(key,out);}
}
配置运行的Driver驱动
package MapReduceTest.DB.Reader;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/* @author wxk* @date 2023/04/19/9:59*/
public class ReaderDriver {public static void main(String[] args) throws Exception {//配置文件对象Configuration conf = new Configuration();//配置当前作业需要的JDBC密码DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://localhost:3306/shop","root","20020219");//创建作业的jobJob job = Job.getInstance(conf, ReaderDriver.class.getSimpleName());//设置MapReduce的输出格式job.setJarByClass(ReaderDriver.class);job.setMapperClass(ReaderMapper.class);//key的格式job.setOutputKeyClass(LongWritable.class);//value的格式job.setOutputValueClass(Text.class);//不需要Reduce阶段,就把ReduceTask设置为 表明不在执行MapReducejob.setNumReduceTasks(0);//设置输入组件job.setInputFormatClass(DBInputFormat.class);FileOutputFormat.setOutputPath(job,new Path("E://mysql_out"));DBInputFormat.setInput(job,OrderBean.class,"select * from `order`","select count(*) from `order`");final boolean b = job.waitForCompletion(true);System.out.println(b ? 0:1);}
}
运行之后,查看文件夹:
查看文件:
对比数据库:
可见任务基本完成
这里有一个小细节,就是输出文件名和之前的不一样,在这里中间是m,而之前是r如图:
这里输出是m是因为Reduce环节根本就没有进行,所以是m而不是r,而之前的是因为走的是全流程,最后经过了Reduce的处理,结果是r
如果经过了Reduce操作,那么输出文件中是r,如果仅仅经过了Map的处理,那么就是m
写入MySQL数据
将数据库中的数据进行清空,然后进行一个配置
Map
package MapReduceTest.DB.Writer;import MapReduceTest.DB.Reader.OrderBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/* @author wxk* @date 2023/04/19/20:05*/
public class WriteDBMapper extends Mapper<LongWritable, Text, NullWritable, OrderBean> {OrderBean outValue = new OrderBean();NullWritable outKey = NullWritable.get();private final static int INCR_NUMBER = 1;@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//计数器的模拟Counter sc = context.getCounter("wxk", "sc_counter");Counter fe = context.getCounter("wxk", "fe_counter");String[] split = value.toString().split("\\t");if (split.length != 4) {//长度不为4表明数据不合法fe.increment(INCR_NUMBER);} else {outValue.setId(Integer.parseInt(split[1]));outValue.setOrder(split[2]);outValue.setTime(split[3]);context.write(outKey,outValue);//合法数据,就加一sc.increment(INCR_NUMBER);}}
}
Reduce
package MapReduceTest.DB.Writer;import MapReduceTest.DB.Reader.OrderBean;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/* @author wxk* @date 2023/04/19/20:27*/
//在这里输出的时候Key必须为DBWritable类,V随意,因为最终是将K写入到数据库中
public class WriteDBReduce extends Reducer<NullWritable, OrderBean, OrderBean, NullWritable> {NullWritable outValue = NullWritable.get();@Overrideprotected void reduce(NullWritable key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException {for (OrderBean item : values) {context.write(item, outValue);}}
}
Driver
package MapReduceTest.DB.Writer;import MapReduceTest.DB.Reader.OrderBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/* @author wxk* @date 2023/04/19/20:32*/
public class WriteDBDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();DBConfiguration.configureDB(conf,"com.mysql.cj.jdbc.Driver","jdbc:mysql://localhost:3306/shop?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true","root","20020219");Job job = Job.getInstance(conf,WriteDBDriver.class.getSimpleName());//设置Mapper驱动job.setMapperClass(WriteDBMapper.class);//设置驱动job.setJarByClass(WriteDBDriver.class);//设置Mapper输出Key的类型job.setMapOutputKeyClass(NullWritable.class);//设置Mapper输出Value的类型job.setMapOutputValueClass(OrderBean.class);//设置Reducejob.setReducerClass(WriteDBReduce.class);//设置Reduce输出的Key的类型job.setOutputKeyClass(OrderBean.class);//设置Reduce输出Value的类型job.setOutputValueClass(NullWritable.class);//设置输入路径FileInputFormat.setInputPaths(job,new Path("E://mysql_out"));//设置输出格式job.setOutputFormatClass(DBOutputFormat.class);//配置作业协入数据库的表/字段DBOutputFormat.setOutput(job,"`order`","id","`order`","time");boolean b = job.waitForCompletion(true);System.out.println(b ? 0: 1);}
}
运行之后:
在这里可以看到成功插入了20条,失败0条
查看MySQL数据库:
插入成功