> 文章列表 > Hadoop入门案例

Hadoop入门案例

Hadoop入门案例

Hadoop的运行流程:

  1. 客户端向HDFS请求文件存储或使用MapReduce计算。
  2. NameNode负责管理整个HDFS系统中的所有数据块和元数据信息;DataNode则实际存储和管理数据块。客户端通过NameNode查找需要访问或处理的文件所在的DataNode,并将操作请求发送到相应的DataNode上。
  3. 当客户端上传一个新文件时(比如输入某些日志),它会被分成固定大小(默认64MB)并进行数据复制以保证可靠性。这些被称为“块”的小段才能与其他硬件组合得出代价最小化、且最佳速度/容错平衡所需表现出来呈现极高效率,并用checksum标记各实例在磁盘上是否一致。
  4. 数据已经保存在位于本地节点存储空间位置内,然后开始进入初始mapper/reducer任务管道处理阶段。(Mapper任务通常需要读取其中一部分输入记录并生成输出键值对给reducer;第二个reducer任务read前面评估器输出字节码映射子集与之相关联,注意写入不同单元)。
  5. Mapper 生成中间结果后,结果根据key排序并重新分配给下一级别stage管道里面去展开reducer工作,直至最终输出结果得到计算。
  6. 输出文件缺省有两个副本。

简单案例

需要注意的是此流程依赖于MapReduce执行模式。

  1. 准备工作

安装Hadoop集群,并将数据文件传输到HDFS中。

  1. 编写Mapper类

每个Mapper类负责解析输入并产生键/值对。在这个简单的案例中,我们已经将整个文件读入内存,并将其行进行拆分:

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split("\\\\s+");for(String w : words){word.set(w);context.write(word,one);}}
}
  1. 编写Reducer类

Reducer是这个案例中另一个关键部分。它接收自定义类型Text和Iterable作为输入,并生成相同类型的输出去避免发生huge memort hit以及I/O相关问题:

public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private TreeMap<Integer,List<String>> sortedMap;protected void setup(Context context) throws IOException,InterruptedException{sortedMap = new TreeMap<Integer,List<String>>(Collections.reverseOrder());}public void reduce(Text key , Iterable<IntWritable> values , Contextcontext) throws IOException , InterruptedException{int sum = 0;for(IntWritable value : values){sum += value.get();}List<String> keys = sortedMap.get(sum);if(keys == null)keys = new ArrayList<String>();keys.add(key.toString());sortedMap.put(sum,keys);}protected void cleanup(Context context) throws IOException,InterruptedException{Set<Map.Entry<Integer,List<String>>> entrySet = sortedMap.entrySet();int counter = 0;for(Map.Entry<Integer,List<String>> entry : entrySet){Integer key = entry.getKey();List<String> values = entry.getValue();for(String val: values){if(counter++ == 10)break;context.write(new Text(val), new IntWritable(key));}}}
}
  1. 配置作业并运行

配置Hadoop作业的驱动程序,它通过DelegatingMapper、IdentityReducer和LazyOutputFormat方法将输出写入HDFS。

在main()函数中设置作业以及传递输入/输出路径:

Job job=Job.getInstance(getConf(),"word count");
job.setJarByClass(WordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);job.setMapperClass(WordCountMapper.class);
//set combiner class reducer to optimize performance
job.setCombinerClass(WordCountReducer.class);
//num of reducers is set as one only here since output size is very small.job.setNumReduceTasks(1);LazyOutputFormat.setOutputFormatClass(job,TextOutputFormat.class); //write directoy instead of file.FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));System.exit(job.waitForCompletion(true)? 0 : 1);
  1. 运行作业

将编译后的jar文件上传到Hadoop集群中,并执行以下命令来运行MapReduce作业:

hadoop jar wordcount.jar /input /output

这个简单的案例就完成了,它读入文本文件并计算出现频率最高的10个单词。