大橙子网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
成都创新互联是一家专业提供陆良企业网站建设,专注与成都做网站、成都网站建设、H5响应式网站、小程序制作等业务。10年已为陆良众多企业、政府机构等服务。创新互联专业网站制作公司优惠进行中。
package partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class KpiApp { public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat"; public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format"; public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); existsFile(conf); Job job = new Job(conf, KpiApp.class.getName()); //打成Jar在Linux运行 job.setJarByClass(KpiApp.class); //1.1 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //1.3 自定义分区 job.setPartitionerClass(KpiPartition.class); job.setNumReduceTasks(2); //1.4 排序分组 //1.5 聚合 //2.1 //2.2 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //2.3 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } private static void existsFile(Configuration conf) throws IOException, URISyntaxException { FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf); if(fs.exists(new Path(OUTPUT_PATH))){ fs.delete(new Path(OUTPUT_PATH), true); } } static class MyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] split = string.split("\t"); String phone = split[1]; Text key2 = new Text(); key2.set(phone); KpiWritable v2= new KpiWritable(); v2.set(split[6],split[7],split[8],split[9]); context.write(key2, v2); } } static class MyReducer extends Reducer { @Override protected void reduce(Text key2, Iterable values,Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for(KpiWritable writable : values){ upPackNum += writable.upPackNum; downPackNum += writable.downPackNum; upPayLoad += writable.upPayLoad; downPayLoad += writable.downPayLoad; } KpiWritable value3 = new KpiWritable(); value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)); context.write(key2, value3); } } } class KpiWritable implements Writable{ long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayLoad); } public void set(String string, String string2, String string3, String string4) { this.upPackNum = Long.parseLong(string); this.downPackNum = Long.parseLong(string2); this.upPayLoad = Long.parseLong(string3); this.downPayLoad = Long.parseLong(string4); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public String toString() { return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; } } class KpiPartition extends Partitioner { @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { String string = key.toString(); return string.length()==11?0:1; } }
Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。
HashPartitioner是MapReduce的默认Partitioner。
以上是“Hadoop中如何分区”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!