新西兰服务器

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数


hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

发布时间:2021-12-09 16:30:01 来源:高防服务器网 阅读:79 作者:小新 栏目:大数据

小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

//map读入的键  package hgs.combinefileinputformat.test;  import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.io.WritableComparable;  public class CombineFileKey implements  WritableComparable<CombineFileKey> {  	private String fileName;  	private long offset;  	  	  	public String getFileName() {  		return fileName;  	}  	public void setFileName(String fileName) {  		this.fileName = fileName;  	}  	public long getOffset() {  		return offset;  	}  	public void setOffset(long offset) {  		this.offset = offset;  	}  	@Override  	public void readFields(DataInput input) throws IOException {  		this.fileName = Text.readString(input);  		this.offset = input.readLong();  		  	}  	@Override  	public void write(DataOutput output) throws IOException {  		Text.writeString(output, fileName);  		output.writeLong(offset);  		  	}  	@Override  	public int compareTo(CombineFileKey obj) {  		int f = this.fileName.compareTo(obj.fileName);  		if(f==0)  			return (int)Math.signum((double)(this.offset-obj.offset));  		return f;  	}  	@Override  	public int hashCode() {  		//摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/  		final int prime = 31;  	    int result = 1;  	    result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());  	    result = prime * result + (int) (offset ^ (offset >>> 32));  	    return result;  	}  	  	@Override  	public boolean equals(Object o) {  		if(o instanceof CombineFileKey)  			return this.compareTo((CombineFileKey)o)==0;  		return false;  	}  }
package hgs.combinefileinputformat.test;  import java.io.IOException;  import org.apache.hadoop.fs.FSDataInputStream;  import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.InputSplit;  import org.apache.hadoop.mapreduce.RecordReader;  import org.apache.hadoop.mapreduce.TaskAttemptContext;  import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  import org.apache.hadoop.util.LineReader;  public class CombineFileReader extends RecordReader<CombineFileKey, Text>{  	private long startOffset; //offset of the chunk;  	private long end; //end of the chunk;  	private long position; // current pos  	private FileSystem fs;  	private Path path;   	private CombineFileKey key;  	private Text value;  	private FSDataInputStream input;  	private LineReader reader;  	public CombineFileReader(CombineFileSplit split,TaskAttemptContext context ,  			Integer index) throws IOException {  		//初始化path fs startOffset end  		this.path = split.getPath(index);  		this.fs = this.path.getFileSystem(context.getConfiguration());  		this.startOffset = split.getOffset(index);  		this.end = split.getLength()+this.startOffset;  		//判断现在开始的位置是否在一行的内部  		boolean skipFirstLine = false;  		  		//open the file  		this.input = fs.open(this.path);  		//不等于0说明读取位置在一行的内部  		if(this.startOffset !=0 ){  			skipFirstLine = true;  			--(this.startOffset);  			//定位到开始读取的位置  			this.input.seek(this.startOffset);  		}  		//初始化reader  		this.reader = new LineReader(input);  		if(skipFirstLine){ // skip first line and re-establish "startOffset".  			//这里着这样做的原因是 一行可能包含了这个文件的所有的数据,猜测如果遇到一行的话,还是会读取一行  			//将其实位置调整到一行的开始,这样的话会舍弃部分数据  			this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min  					             ((long)Integer.MAX_VALUE, this.end - this.startOffset));  		}  		this.position = this.startOffset;  	}  	  	@Override  	public void close() throws IOException {}  	@Override  	public void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {}  	//返回当前的key  	@Override  	public CombineFileKey getCurrentKey() throws IOException, InterruptedException {  		return key;  	}  	//返回当前的value  	@Override  	public Text getCurrentValue() throws IOException, InterruptedException {  		return value;  	}  	//执行的进度  	@Override  	public float getProgress() throws IOException, InterruptedException {  		//返回的类型为float  		if(this.startOffset==this.end){  			return 0.0f;  		}else{  			return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset));  		}  	}  	//该方法判断是否有下一个key value  	@Override  	public boolean nextKeyValue() throws IOException, InterruptedException {  		//对key和value初始化  		if(this.key == null){  			this.key = new CombineFileKey();  			this.key.setFileName(this.path.getName());  		}  		this.key.setOffset(this.position);  		if(this.value == null){  			this.value = new Text();  		}  		//读取一行数据,如果读取的newSieze=0说明split的数据已经处理完成  		int newSize = 0;  		if(this.position<this.end){  			newSize = reader.readLine(this.value);  			position += newSize;  		}  		//没有数据,将key value置位空  		if(newSize == 0){  			this.key = null;  			this.value = null;  			return false;  		}else{  			return true;  		}  	}  	  }
package hgs.combinefileinputformat.test;  import java.io.IOException;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.InputSplit;  import org.apache.hadoop.mapreduce.JobContext;  import org.apache.hadoop.mapreduce.RecordReader;  import org.apache.hadoop.mapreduce.TaskAttemptContext;  import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;  import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;  import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  public class CustCombineInputFormat extends CombineFileInputFormat<CombineFileKey, Text> {  	public CustCombineInputFormat(){  		super();  		//最大切片大小  		this.setMaxSplitSize(67108864);//64 MB  	}  	@Override  	public RecordReader<CombineFileKey, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {		  		return new CombineFileRecordReader<CombineFileKey, Text>((CombineFileSplit)split,context,CombineFileReader.class);  	}  	@Override  	protected boolean isSplitable(JobContext context, Path file) {  		return false;  	}  }  //驱动类  package hgs.test;  import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  import hgs.combinefileinputformat.test.CustCombineInputFormat;  public class LetterCountDriver {  	public static void main(String[] args) throws Exception {  		Configuration conf = new Configuration();  		//conf.set("mapreduce.map.log.level", "INFO");  		///conf.set("mapreduce.reduce.log.level", "INFO");  		Job job = Job.getInstance(conf, "LetterCount");  		job.setJarByClass(hgs.test.LetterCountDriver.class);  		// TODO: specify a mapper  		job.setMapperClass(LetterCountMapper.class);  		// TODO: specify a reducer  		job.setReducerClass(LetterReducer.class);  		// TODO: specify output types  		job.setOutputKeyClass(Text.class);  		job.setOutputValueClass(IntWritable.class);  		if(args[0].equals("1"))  			job.setInputFormatClass(CustCombineInputFormat.class);  		else{}  		// TODO: specify input and output DIRECTORIES (not files)  		FileInputFormat.setInputPaths(job, new Path("/words"));  		FileOutputFormat.setOutputPath(job, new Path("/result"));  		if (!job.waitForCompletion(true))  			return;  	}  }

hdfs文件:

运行结果:不使用自定义的:CustCombineInputFormat

运行结果:在使用自定义的:CustCombineInputFormat

以上是“hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注高防服务器网行业资讯频道!

[微信提示:高防服务器能助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。

[图文来源于网络,不代表本站立场,如有侵权,请联系高防服务器网删除]
[