HDFS是一个分布式文件系统,然而对于程序员来说,HDFS就是一个普通文件系统,Hadoop进行的底层封装,程序员按照相应的API来对HDFS上的文件操作,和对本地磁盘文件操作没有太多区别。但是最初接触时可能还是会碰到这样那样的问题。

一.功能实现:

存储文件:

例如:获取FileSystem实例时会出现

1:将本地文件上传到HDFS上

import java.io.BufferedInputStream;

java.lang.NullPointerException
    at
org.apache.hadoop.conf.Configuration.get(Configuration.java:382)
    at
org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:570)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:192)
    at hadoop.test.URLCat.copyFileToAnotherFile(URLCat.java:38)
//这个是我写的一个方法,报错了
    at hadoop.test.URLCat.main(URLCat.java:83)

2:从HDFS上读取文件到本地

import java.io.FileInputStream;

代码:

3:删除HDFS上的文件

import java.io.IOException;

package hadoop.test;

4:遍历HDFS上的文件

import java.io.InputStream;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;

注:

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

1:导入google-collections-xx.jar文件。

 

public class URLCat extends Configured {
   
    /×static{
        Configuration.addDefaultResource(“hdfs-default.xml”);
        Configuration.addDefaultResource(“hdfs-site.xml”);
        Configuration.addDefaultResource(“mapred-default.xml”);
        Configuration.addDefaultResource(“mapred-site.xml”);
    } ×/没有这个static块时就会报上面对错误

2:文件目标地址要具体到文件名,而不能是文件夹。

import
org.apache.Hadoop.conf.Configuration;

    public  void copyFileToAnotherFile(String[] args)
    {
        InputStream in = null;
        OutputStream out = null;
        try {
            String sourceFile = args[0];
            String targetFile = args[1];
            in = new BufferedInputStream(new
FileInputStream(sourceFile));
           
            Configuration conf = new Configuration();
            System.out.println(conf);
            System.out.println(URI.create(targetFile)==null);
            System.out.println(conf==null);
           
System.out.println(FileSystem.get(URI.create(targetFile),conf)==null);
           
            FileSystem fs =
DistributedFileSystem.get(URI.create(targetFile),conf);
            System.out.println(fs);
            out = fs.create(new Path(targetFile),new Progressable(){
                public void progress(){System.out.print(“.”);}
            });
            IOUtils.copyBytes(in, out, 4096,true);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally
        {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
    }
   
    static {
        URL.setURLStreamHandlerFactory(new
FsUrlStreamHandlerFactory());
    }

图片 1

import org.apache.hadoop.fs.FileSystem;

    public static void displayFile(String[] args)
    {
         InputStream in = null;
            try {
                in = new URL(args[0]).openStream();
                IOUtils.copyBytes(in, System.out, 4096,false);
            } catch (MalformedURLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally
            {
                IOUtils.closeStream(in);
            }
    }
    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new URLCat().copyFileToAnotherFile(args);
        //URLCat.displayFile(args);
        //
    }

二.代码实现:

import org.apache.hadoop.fs.Path;

}

package com.hdfs;

import org.apache.hadoop.io.IOUtils;

图片 2

importJava.io.BufferedInputStream;

import org.apache.hadoop.io.SequenceFile;

import java.io.FileInputStream;

import org.apache.hadoop.io.Text;

import java.io.FileNotFoundException;

 

import java.io.FileOutputStream;

public class SequenceFileWrite {

import java.io.IOException;

public static void main(String[]
args) throws IOException {

import java.io.InputStream;

String src =
“E:\test\spring3_MVC.docx”;

import java.io.OutputStream;

InputStream in = new
BufferedInputStream(new FileInputStream(src));

import
java.NET.URI;

String uri =
“hdfs://localhost:9000/home/hdfs/spring.seq”;

import
org.apache.Hadoop.conf.Configuration;

   Configuration conf = new
Configuration();

import org.apache.hadoop.fs.FSDataInputStream;

   FileSystem fs =
FileSystem.get(URI.create(uri), conf);

import org.apache.hadoop.fs.FileStatus;

   Path path = new Path(uri);

import org.apache.hadoop.fs.FileSystem;

   Text key = new Text();   

import org.apache.hadoop.fs.Path;

   Text value = new Text();

import org.apache.hadoop.io.IOUtils;

   SequenceFile.Writer writer =
null;    

import org.apache.hadoop.util.Progressable;

   try {

public class HdfsOperation {

   
 //返回一个SequenceFile.Writer实例 需要数据流和path对象
将数据写入了path对象

/**

     writer =
SequenceFile.createWriter(fs, conf, path,key.getClass(),
value.getClass());  

* 上传文件到HDFS上去

     int len = 0;

*/

     byte[] buff = new
byte[1024];

private static void uploadToHdfs() throws FileNotFoundException,

     key.set(“spring.docx”);

IOException {

     while ((len =
in.read(buff))!= -1) {

String localSrc = “/home/file2/a.txt”;

    value.set(buff,0,len);

String dst = “hdfs://master:9000/user/root/input0/a.txt”;

writer.append(key,
value);//将每条记录追加到SequenceFile.Writer实例的末尾   

InputStream in = new BufferedInputStream(new FileInputStream(localSrc));

   
value.clear();

Configuration conf = new Configuration();

 }

FileSystem fs = FileSystem.get(URI.create(dst), conf);

   } finally {

OutputStream out = fs.create(new Path(dst), new Progressable() {

     IOUtils.closeStream(writer);

public void progress() {

     IOUtils.closeStream(in);

System.out.print(“.”);

   }

}

 }

});

}

IOUtils.copyBytes(in, out, 4096, true);

 

}

读取文件:

/**

   import java.io.FileOutputStream;

* 从HDFS上读取文件

import java.io.IOException;

*/

import java.io.OutputStream;

private static void readFromHdfs() throws
FileNotFoundException,IOException {

import java.net.URI;

String dst = “hdfs://192.10.5.76:9000/user/root/input0/a.txt”;

 

Configuration conf = new Configuration();

import org.apache.hadoop.conf.Configuration;

FileSystem fs = FileSystem.get(URI.create(dst), conf);

import org.apache.hadoop.fs.FileSystem;

FSDataInputStream hdfsInStream = fs.open(new Path(dst));

import org.apache.hadoop.fs.Path;

OutputStream out = new FileOutputStream(“/home/li”);

import org.apache.hadoop.io.IOUtils;

byte[] ioBuffer = new byte[1024];

import org.apache.hadoop.io.SequenceFile;

int readLen = hdfsInStream.read(ioBuffer);

import org.apache.hadoop.io.Text;

while(-1 != readLen){

import org.apache.hadoop.io.Writable;

out.write(ioBuffer, 0, readLen);

import org.apache.hadoop.util.ReflectionUtils;

readLen = hdfsInStream.read(ioBuffer);

 

}

 

out.close();

public class SequenceFileReader {

hdfsInStream.close();

public static void main(String[]
args) throws IOException {  

fs.close();

   String uri =
“hdfs://localhost:9000/home/hdfs/spring.seq”;  

}

   Configuration conf = new
Configuration();  

/**

   FileSystem fs =
FileSystem.get(URI.create(uri), conf);  

* HDFS上删除文件

   Path path = new Path(uri);
   

* @throws FileNotFoundException

   SequenceFile.Reader reader =
null;  

* @throws IOException

   String dst =
“e:\test\spring.docx”;    

*/

   OutputStream out = null;

private static void deleteFromHdfs() throws
FileNotFoundException,IOException {

   try {  

String dst = “hdfs://master:9000/user/root/input0/a.txt”;

     reader = new
SequenceFile.Reader(fs, path, conf);

Configuration conf = new Configuration();

     //返回 SequenceFile.Reader
对象       getKeyClass()获得Sequence中使用的类型  

FileSystem fs = FileSystem.get(URI.create(dst), conf);

     Writable key = (Writable)
 ReflectionUtils.newInstance(reader.getKeyClass(), conf);

fs.deleteOnExit(new Path(dst));

     out =new
 FileOutputStream(dst);

fs.close();

     Text  value = new Text();

}

     while (reader.next(key,
value)) { //next()方法迭代读取记录 直到读完返回false  

/**

   
System.out.println(key);

* 遍历HDFS上的文件和目录

   
out.write(value.getBytes(),0,value.getLength());//这个长度一定要添加,否则不兼容office2007

*/

   
value.clear();
 //记着清除一下,不然可能会出现多余的输出     

private static void getDirectoryFromHdfs() throws
FileNotFoundException,IOException {

     }  

String dst = “hdfs://master:9000/user/root/input0”;

     out.flush();

Configuration conf = new Configuration();

   } finally {  

FileSystem fs = FileSystem.get(URI.create(dst), conf);

     IOUtils.closeStream(reader);
 

FileStatus fileList[] = fs.listStatus(new Path(dst));

     IOUtils.closeStream(out);

int size = fileList.length;

   }  

for(int i = 0; i < size; i++){

 } 

System.out.println(“name:” + fileList[i].getPath().getName() +
“ttsize:” + fileList[i].getLen());

}

}

图片 3

fs.close();

}

/**

* main函数

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

try {

uploadToHdfs();

readFromHdfs();

deleteFromHdfs();

getDirectoryFromHdfs();

} catch (Exception e) {

// TODO Auto-generated catch block

System.out.println(2);

e.printStackTrace();

}

finally {

System.out.println(“SUCCESS”);

System.out.println(3);

}

}

}

相关文章