普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。

由于Hadoop默认编码为UTF-8,并且将UTF-8进行了硬编码,所以我们在处理中文时需要重写OutputFormat类。方法为:

因为需要自定义实现输出文件的格式,现在来分析一下TextOutputFormat的源码;

在Hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法。如果只是想做到输出结果的文件名可控,实现自己的LogNameMultipleTextOutputFormat类,设置jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);就可以了,但是这种方式只限于使用旧版本的hadoop
api.如果想采用新版本的api接口或者自定义输出内容的格式等等更多的需求,那么就要自己动手重写一些hadoop
api了。

1、新建类GBKFileOutputFormat,代码如下:
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.util.*; 
 
/** An {@link OutputFormat} that writes plain text files. */ 
public class GBKFileOutputFormat<K, V> extends
FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式 
  protected static class LineRecordWriter<K, V>//默认 
    extends RecordWriter<K, V> { 
    private static final String utf8 = “GBK”; 
//硬编码,将“UTF-8”改为“GBK” 
    private static final byte[] newline;//行结束符? 
    static { 
      try { 
        newline = “n”.getBytes(utf8); 
      } catch (UnsupportedEncodingException uee) { 
        throw new IllegalArgumentException(“can’t find ” + utf8 + ”
encoding”); 
      } 
    } 
 
    protected DataOutputStream out; 
    private final byte[]
keyValueSeparator;//key和value的分隔符,默认的好像是Tab 
 
    public LineRecordWriter(DataOutputStream out, String
keyValueSeparator) {//构造函数,初始化输出流及分隔符 
      this.out = out; 
      try { 
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 
      } catch (UnsupportedEncodingException uee) { 
        throw new IllegalArgumentException(“can’t find ” + utf8 + ”
encoding”); 
      } 
    } 
 
    public LineRecordWriter(DataOutputStream out) {//默认的分隔符 
      this(out, “t”); 
    } 
 
    /**
    * Write the object to the byte stream, handling Text as a
special输出流是byte格式的
    * case.
    * @param o the object to print是要输出的对象
    * @throws IOException if the write throws, we pass it on
    */ 
    private void writeObject(Object o) throws IOException
{//应该是一行一行的写 key keyValueSeparator value n 
      if (o instanceof Text) {//如果o是Text的实例 
        Text to = (Text) o; 
        out.write(to.getBytes(), 0, to.getLength());//写出 
      } else { 
        out.write(o.toString().getBytes(utf8)); 
      } 
    } 
 
    public synchronized void write(K key, V
value)//给写线程加锁,写是互斥行为 
      throws IOException { 
//下面是为了判断key和value是否为空值 
      boolean nullKey = key == null || key instanceof
NullWritable;//这语句太牛了 
      boolean nullValue = value == null || value instanceof
NullWritable; 
      if (nullKey && nullValue) {// 
        return; 
      } 
      if (!nullKey) { 
        writeObject(key); 
      } 
      if (!(nullKey || nullValue)) { 
        out.write(keyValueSeparator); 
      } 
      if (!nullValue) { 
        writeObject(value); 
      } 
      out.write(newline); 
    } 
 
    public synchronized 
    void close(TaskAttemptContext context) throws IOException { 
      out.close(); 
    } 
  } 
 
  public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext
job//获得writer实例 
                        ) throws IOException, InterruptedException { 
    Configuration conf = job.getConfiguration(); 
    boolean isCompressed = getCompressOutput(job);// 
    String keyValueSeparator=
conf.get(“mapred.textoutputformat.separator”, 
                                      “t”); 
    CompressionCodec codec = null;//压缩格式 还是? 
    String extension = “”; 
    if (isCompressed) { 
      Class<? extends CompressionCodec> codecClass = 
        getOutputCompressorClass(job, GzipCodec.class); 
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
conf); 
      extension = codec.getDefaultExtension(); 
    } 
    Path file = getDefaultWorkFile(job,
extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现 
    FileSystem fs = file.getFileSystem(conf); 
    if (!isCompressed) { 
      FSDataOutputStream fileOut = fs.create(file, false); 
      return new LineRecordWriter<K, V>(fileOut,
keyValueSeparator); 
    } else { 
      FSDataOutputStream fileOut = fs.create(file, false); 
      return new LineRecordWriter<K, V>(new DataOutputStream 
                                       
(codec.createOutputStream(fileOut)), 
                                        keyValueSeparator); 
    } 
  } 

源码如下,注释会直接放在源码之中

首先需要构造一个自己的MultipleOutputFormat类实现FileOutputFormat类(注意是org.apache.hadoop.mapreduce.lib.output包的FileOutputFormat)

该类是在源代码中TextOutputFormat类基础上进行修改的,在这需要注意的一点是继承的父类FileOutputFormat是位于org.apache.hadoop.mapreduce.lib.output包中的

  1. package org.apache.Hadoop.mapreduce.lib.output;  
  2.   
  3. import java.io.DataOutputStream;  
  4. import java.io.IOException;  
  5. import java.io.UnsupportedEncodingException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.fs.FSDataOutputStream;  
  11.   
  12. import org.apache.hadoop.io.NullWritable;  
  13. import org.apache.hadoop.io.Text;  
  14. import org.apache.hadoop.io.compress.CompressionCodec;  
  15. import org.apache.hadoop.io.compress.GzipCodec;  
  16. import org.apache.hadoop.mapreduce.OutputFormat;  
  17. import org.apache.hadoop.mapreduce.RecordWriter;  
  18. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  19. import org.apache.hadoop.util.*;  
  20.   
  21. /** An {@link OutputFormat} that writes plain text files. */  
  22. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式   
  23.   protected static class LineRecordWriter<K, V>//默认   
  24.     extends RecordWriter<K, V> {  
  25.     private static final String utf8 = “UTF-8”;  
  26.     private static final byte[] newline;//行结束符?   
  27.     static {  
  28.       try {  
  29.         newline = “n”.getBytes(utf8);  
  30.       } catch (UnsupportedEncodingException uee) {  
  31.         throw new IllegalArgumentException(“can’t find ” + utf8 + ” encoding”);  
  32.       }  
  33.     }  
  34.   
  35.     protected DataOutputStream out;  
  36.     private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab   
  37.   
  38.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符    
  39.       this.out = out;  
  40.       try {  
  41.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
  42.       } catch (UnsupportedEncodingException uee) {  
  43.         throw new IllegalArgumentException(“can’t find ” + utf8 + ” encoding”);  
  44.       }  
  45.     }  
  46.   
  47.     public LineRecordWriter(DataOutputStream out) {//默认的分隔符   
  48.       this(out, “t”);  
  49.     }  
  50.   
  51.     /** 
  52.      * Write the object to the byte stream, handling Text as a special输出流是byte格式的 
  53.      * case. 
  54.      * @param o the object to print是要输出的对象 
  55.      * @throws IOException if the write throws, we pass it on 
  56.      */  
  57.     private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value n
      
  58.       if (o instanceof Text) {//如果o是Text的实例   
  59.         Text to = (Text) o;  
  60.         out.write(to.getBytes(), 0, to.getLength());//写出   
  61.       } else {  
  62.         out.write(o.toString().getBytes(utf8));  
  63.       }  
  64.     }  
  65.   
  66.     public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为   
  67.       throws IOException {  
  68. <span style=”white-space:pre”>    </span>//下面是为了判断key和value是否为空值   
  69.       boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了   
  70.       boolean nullValue = value == null || value instanceof NullWritable;  
  71.       if (nullKey && nullValue) {//   
  72.         return;  
  73.       }  
  74.       if (!nullKey) {  
  75.         writeObject(key);  
  76.       }  
  77.       if (!(nullKey || nullValue)) {  
  78.         out.write(keyValueSeparator);  
  79.       }  
  80.       if (!nullValue) {  
  81.         writeObject(value);  
  82.       }  
  83.       out.write(newline);  
  84.     }  
  85.   
  86.     public synchronized   
  87.     void close(TaskAttemptContext context) throws IOException {  
  88.       out.close();  
  89.     }  
  90.   }  
  91.   
  92.   public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//获得writer实例   
  93.                          ) throws IOException, InterruptedException {  
  94.     Configuration conf = job.getConfiguration();  
  95.     boolean isCompressed = getCompressOutput(job);//   
  96.     String keyValueSeparator= conf.get(“mapred.textoutputformat.separator”,  
  97.                                        “t”);  
  98.     CompressionCodec codec = null;//压缩格式 还是?   
  99.     String extension = “”;  
  100.     if (isCompressed) {  
  101.       Class<? extends CompressionCodec> codecClass =   
  102.         getOutputCompressorClass(job, GzipCodec.class);  
  103.       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
  104.       extension = codec.getDefaultExtension();  
  105.     }  
  106.     Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现
      
  107.     FileSystem fs = file.getFileSystem(conf);  
  108.     if (!isCompressed) {  
  109.       FSDataOutputStream fileOut = fs.create(file, false);  
  110.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
  111.     } else {  
  112.       FSDataOutputStream fileOut = fs.create(file, false);  
  113.       return new LineRecordWriter<K, V>(new DataOutputStream  
  114.                                         (codec.createOutputStream(fileOut)),  
  115.                                         keyValueSeparator);  
  116.     }  
  117.   }  
  118. }  

    更多Hadoop相关信息见Hadoop 专题页面

  1. import java.io.DataOutputStream;  
  2. import java.io.IOException;  
  3. import java.util.HashMap;  
  4. import java.util.Iterator;  
  5.   
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FSDataOutputStream;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Writable;  
  11. import org.apache.hadoop.io.WritableComparable;  
  12. import org.apache.hadoop.io.compress.CompressionCodec;  
  13. import org.apache.hadoop.io.compress.GzipCodec;  
  14. import org.apache.hadoop.mapreduce.OutputCommitter;  
  15. import org.apache.hadoop.mapreduce.RecordWriter;  
  16. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  19. import org.apache.hadoop.util.ReflectionUtils;  
  20.   
  21.   
  22. /** 
  23.  * This abstract class extends the FileOutputFormat, allowing to write the 
  24.  * output data to different output files. There are three basic use cases for 
  25.  * this class.  
  26.  * Created on 2012-07-08 
  27.  * @author zhoulongliu 
  28.  * @param <K> 
  29.  * @param <V> 
  30.  */  
  31. public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends  
  32.         FileOutputFormat<K, V> {  
  33.   
  34.   
  35.    //接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名
      
  36.     private MultiRecordWriter writer = null;  
  37.   
  38.   
  39.     public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {  
  40.         if (writer == null) {  
  41.             writer = new MultiRecordWriter(job, getTaskOutputPath(job));  
  42.         }  
  43.         return writer;  
  44.     }  
  45.   
  46.   
  47.     /** 
  48.      * get task output path 
  49.      * @param conf 
  50.      * @return 
  51.      * @throws IOException 
  52.      */  
  53.     private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {  
  54.         Path workPath = null;  
  55.         OutputCommitter committer = super.getOutputCommitter(conf);  
  56.         if (committer instanceof FileOutputCommitter) {  
  57.             workPath = ((FileOutputCommitter) committer).getWorkPath();  
  58.         } else {  
  59.             Path outputPath = super.getOutputPath(conf);  
  60.             if (outputPath == null) {  
  61.                 throw new IOException(“Undefined job output-path”);  
  62.             }  
  63.             workPath = outputPath;  
  64.         }  
  65.         return workPath;  
  66.     }  
  67.   
  68.   
  69.     /** 
  70.      * 通过key, value, conf来确定输出文件名(含扩展名) Generate the file output file name based 
  71.      * on the given key and the leaf file name. The default behavior is that the 
  72.      * file name does not depend on the key. 
  73.      *  
  74.      * @param key the key of the output data 
  75.      * @param name the leaf file name 
  76.      * @param conf the configure object 
  77.      * @return generated file name 
  78.      */  
  79.     protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);  
  80.   
  81.   
  82.    /** 
  83.     * 实现记录写入器RecordWriter类 
  84.     * (内部类) 
  85.     * @author zhoulongliu 
  86.     * 
  87.     */  
  88.     public class MultiRecordWriter extends RecordWriter<K, V> {  
  89.         /** RecordWriter的缓存 */  
  90.         private HashMap<String, RecordWriter<K, V>> recordWriters = null;  
  91.         private TaskAttemptContext job = null;  
  92.         /** 输出目录 */  
  93.         private Path workPath = null;  
  94.   
  95.   
  96.         public MultiRecordWriter(TaskAttemptContext job, Path workPath) {  
  97.             super();  
  98.             this.job = job;  
  99.             this.workPath = workPath;  
  100.             recordWriters = new HashMap<String, RecordWriter<K, V>>();  
  101.         }  
  102.   
  103.   
  104.         @Override  
  105.         public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
  106.             Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();  
  107.             while (values.hasNext()) {  
  108.                 values.next().close(context);  
  109.             }  
  110.             this.recordWriters.clear();  
  111.         }  
  112.   
  113.   
  114.         @Override  
  115.         public void write(K key, V value) throws IOException, InterruptedException {  
  116.             // 得到输出文件名   
  117.             String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());  
  118.            //如果recordWriters里没有文件名,那么就建立。否则就直接写值。
      
  119.             RecordWriter<K, V> rw = this.recordWriters.get(baseName);  
  120.             if (rw == null) {  
  121.                 rw = getBaseRecordWriter(job, baseName);  
  122.                 this.recordWriters.put(baseName, rw);  
  123.             }  
  124.             rw.write(key, value);  
  125.         }  
  126.   
  127.   
  128.         // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
      
  129.         private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException,  
  130.                 InterruptedException {  
  131.             Configuration conf = job.getConfiguration();  
  132.            //查看是否使用解码器     
  133.             boolean isCompressed = getCompressOutput(job);  
  134.             String keyValueSeparator = “,”;  
  135.             RecordWriter<K, V> recordWriter = null;  
  136.             if (isCompressed) {  
  137.                 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);  
  138.                 CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);  
  139.                 Path file = new Path(workPath, baseName + codec.getDefaultExtension());  
  140.                 FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
  141.                 //这里我使用的自定义的OutputFormat    
  142.                 recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),  
  143.                         keyValueSeparator);  
  144.             } else {  
  145.                 Path file = new Path(workPath, baseName);  
  146.                 FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
  147.                 //这里我使用的自定义的OutputFormat    
  148.                 recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
  149.             }  
  150.             return recordWriter;  
  151.         }  
  152.     }  
  153.   
  154.   
  155. }  

2、在主类中添加job.setOutputFormatClass(GBKFileOutputFormat.class);

图片 1

图片 2

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

图片 3

相关文章