在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
有一批电话通信清单,保存了主叫和被叫的记录,记录格式下,主叫和被叫之间是以空格隔开的。 13400001111 10086 13500002222 10000 13600003333 114 13700004444 12580 13711111111 10086 13822222222 12580 13922225555 12580 18622220000 114 18800000000 114 现在需要做一个倒排索引,记录拨打给被叫的所有主叫号码,记录的格式如下,主叫号码之间以|分隔。 10000 13500002222| 10086 13400001111|13711111111| 114 13600003333|18622220000|18800000000| 12580 13700004444|13822222222|13922225555|
1、算法思路 源文件——》Mapper(分隔原始数据,以被叫作为key,以主叫作为value)——》Reducer(把拥有相同被叫的主叫号码用|分隔汇总)——》输出到HDFS 2、Hadoop程序 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class App_2 extends Configured implements Tool{ @Override public int run(String[] arg0) throws Exception { Configuration conf = getConf(); Job job = new Job(conf,"App_2"); job.setJarByClass(App_2.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setMapperClass(CallMapper.class); job.setReducerClass(CallReducer.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } enum Counter{ SKIPLINE,//记录出错的行数 } /** *Mapper<LongWritable,Text,Text,Text> *LongWritable,Text 是输入数据的key和value 如:清单的每一行的首字符的偏移量作为key,整一行的内容作为value *Text,Text 是输出数据的key和value * */ public static class CallMapper extends Mapper<LongWritable,Text,Text,Text> { //map(LongWritable key,Text value,Context context) //LongWritable key,Text value,和CallMapper类的输入数据的key、value对应 //Context 上下文环境 public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { try { String line = value.toString(); String[] call = line.split(" "); String caller = call[0];//主叫 String callee = call[1];//被叫 Text outKey = new Text(callee); Text outValue = new Text(caller); context.write(outKey, outValue);//被叫作为key,主叫作为value输出 } catch(ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.SKIPLINE).increment(1);//出错,行数+1 return; } } } /** *Reducer<Text,Text,Text,Text> *Text,Text,是输入数据的key和value,对应Mapper中的输出数据 *Text,Text 是最终输出数据的key和value * */ public static class CallReducer extends Reducer<Text,Text,Text,Text>{ //reduce(Text key,Text value,Context context) //Text key,Iterable<Text> values,和CallMapper类的输出数据的key、value对应,其中values是对应key的所有主叫的集合 //Context 上下文环境 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException { String result = ""; String temp = ""; //对主叫用|分隔 for(Text value : values) { temp = value.toString(); result += (temp + "|"); } Text outValue = new Text(result); //最终输出:被叫 用|分隔的主叫 context.write(key, outValue); } } public static void main(String[] args) throws Exception{ int res = ToolRunner.run(new Configuration(), new App_2(), args); System.exit(res); } }
3、可以在eclipse中运行程序,输入两个参数,一个是通话清单文件所在路径,一个是结果输出目录 4、也可以将程序打成jar包,用命令执行。 [coder@h1 hadoop-0.20.2]$ bin/hadoop jar /home/coder/call.jar /user/coder/in/call.txt /user/coder/output
注意:/user/coder/in/call.txt 和/user/coder/output都是HDFS中的路径
|
请发表评论