> 文章列表 > Hadoop学习笔记(持续更新中)

Hadoop学习笔记(持续更新中)

Hadoop学习笔记(持续更新中)

文章目录

  • Hadoop
    • Hadoop 的组成
    • HDFS
    • MapReduce
      • job、task、input split
      • combiner
      • 其他
      • 示例

Hadoop

Hadoop 的组成

  • Hadoop1.x
    • MapReduce(计算+资源调度)
    • HDFS(数据存储)
    • Common(辅助工具)
  • Hadoop2.x、3.x
    • Yarn(资源调度): A framework for job scheduling and cluster resource management.
    • MapReduce(计算): A YARN-based system for parallel processing of large data sets.
    • HDFS(数据存储): A distributed file system that provides high-throughput access to application data.
    • Common(辅助工具): The common utilities that support the other Hadoop modules.

Hadoop1.x 时 代 ,Hadoop中 的MapReduce同时处理业务逻辑运算和资源的调度,耦合性较大。

在Hadoop2.x时 代,增加 了Yarn。Yarn只负责资 源 的 调 度 ,MapReduce 只负责运算。

Hadoop3.x在组成上没有变化。

HDFS

Hadoop Distributed File System

特点

高吞吐流式数据访问 模式来存储 单写入多读取仅末尾追加超大文件,运行于 商用硬件 集群上

流式数据访问:一次写入、多次读取

超大文件:HDFS中的典型文件大小为GB到TB

商用硬件:即轻易能买到的普通硬件,对庞大集群来说节点故障率不低,于是HDFS核心目标是能检测故障并快速自动地从故障中恢复,且在故障时继续运行,不让用户察觉到明显的中断

高吞吐:HDFS重点在于数据访问的高吞吐量,而不是低延迟,低延迟场景不适合用HDFS(几十毫秒),更推荐用HBASE

简单一致性:单用户写入(不可多用户同时写入),写操作仅支持“文件末尾追加”,不支持任意位置修改

可移植性:HDFS是使用Java语言构建的,任何支持Java的机器都可以运行NameNode或DataNode软件。使用高度可移植的Java语言意味着HDFS可以部署在各种各样的机器上。

NameNode和DataNode

HDFS集群具有主/从架构,由一个NameNode和多个DataNode组成。

NameNode是一个主服务器,它管理文件系统命名空间,维护文件系统树及整棵树内所有的文件和目录,这些信息以命名空间镜像文件和编辑日志文件存储在磁盘上。
NameNode执行文件系统命名空间操作,如打开、关闭和重命名文件和目录。它还决定了块到datanode的映射。——HDFS元数据

Datanode是文件系统的工作节点,它们根据需要存储并检索数据块,并定期的向namenode发送它们所存储的块的列表。
DataNode通常集群中每个节点一个,用于管理与它们所运行的节点相连的存储。文件将被分割成一个或多个块,存储在一组datanode中。datanode负责为来自文件系统客户端的读写请求提供服务。datanode还根据NameNode的指令执行块的创建、删除和复制。——用户数据

如果namenode损坏,文件系统上的所有文件将都会丢失,因为将不知道如何根据datanode的块重建文件。
因此,hadoop对namenode的提供两种容错方式,一种是备份那些组成文件系统元数据持久状态的文件,另一种是运行一个辅助namenode,用于定期合并编辑日志与命名空间镜像。

文件系统命名空间

HDFS支持传统的分层文件组织,其层次结构与大多数其他现有文件系统相似。

用户或应用程序可以创建目录并将文件存储在这些目录中,可以进行创建、删除、移动、重命名等操作。

支持用户配额和访问权限。

NameNode维护文件系统名称空间。对文件系统名称空间或其属性的任何更改均由NameNode记录。

数据块

HDFS上的文件被分为多个块,除最后一个块外的所有块都具有相同的大小。

可以为文件指定复制因子,将数据块复制到其他结点,确保发生故障时数据不丢失(故障时读取副本的过程对用户透明)。

复制因子可以在文件创建时指定,以后可以更改。

副本的最大数量是当时DataNode的总数,因为一个DataNode不能具有同一块的多个副本。

小于一个块大小的文件不会占据整个块的空间。

块缓存

通常datanode从磁盘中读取块,而频繁访问的文件可能被显式地缓存在datanode的内存中,以堆外块缓存的形式存在。
一个块默认仅缓存在一个datanode的内存中。

MapReduce

Hadoop MapReduce是一个软件框架,可以编写用于大规模数据处理的分布式应用程序(TB级别),并发、可靠可容错地运行在hadoop集群上。

job、task、input split

MapReduce作业(Job)被Hadoop分成若干个任务(task),包括map任务和reduce任务,由YARN进行调度。多个map并发执行。

输入数据会被分成等长的小数据块,称为输入分片(input split)。每个分片对应一个map任务,该任务会调用用户自定义的map()函数以处理该分片的数据。一般建议分片大小与HDFS块大小相同,以减少网络传输。

通常Hadoop在存有输入数据的HDFS节点上运行map任务,这样可以减少网络传输。

combiner

combiner函数属于可选的优化方案,可对一个map任务指定一个combiner,在进入reduce之前先处理map的输出,以减少reduce的输入。

一般可以和reduce函数一样的实现。

其他

map和reduce的输入输出都是键值对,其类必须序列化, 此外,关键类必须实现WritableComparable接口,以利于框架进行排序。

Hadoop提供了一套可优化网络序列化传输的基本类型,在org.apache.hadoop.io中,例如LongWritable相当于Java的Long、Text相当于String

示例

import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}