Hadoop 电话通信清单
一、实例要求
现有一批电话通信清单,记录了用户A拨打某些特殊号码(如120,10086,13800138000等)的记录。需要做一个统计结果,记录拨打给用户B的所有用户A。
二、测试样例
样例输入:
file.txt:
13599999999 10086
13899999999 120
13944444444 1380013800
13722222222 1380013800
18800000000 120
13722222222 10086
18944444444 10086
样例输出:
三、算法思路
源文件——》Mapper(分隔原始数据,以被叫作为key,以主叫作为value)——》Reducer(把拥有相同被叫的主叫号码用|分隔汇总)——》输出到HDFS
四、程序代码
程序代码如下:
import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class Tel {public static class Map extends Mapper<LongWritable, Text, Text, Text>{@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// super.map(key, value, context);String line = value.toString();Text word = new Text();String [] lineSplite = line.split(" ");String anum = lineSplite[0];String bnum = lineSplite[1];context.write(new Text(bnum), new Text(anum));}}public static class Reduce extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// super.reduce(arg0, arg1, arg2);String valueString;String out ="";for(Text value: values){valueString=value.toString();out += valueString+"|";}context.write(key, new Text(out));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2){System.out.println("Usage:wordcount <in> <out>");System.exit(2);}Job job = new Job(conf,"Tel");job.setJarByClass(Tel.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}}