HDFS的java-API操作
简介
hdfs在生产应用中主要是客户端的开发,其核心步骤是从hdfs提供的api中构造一个hdfs的访问客户端对象,然后通过该客户端对象操作(增删改查)hdfs上的文件。
导入依赖包(maven)
利用maven导入hadoop开发相关的依赖包,pom.xml
注:如需手动引入jar包,hdfs的jar包—-hadoop的安装目录的share下
获取API中的客户端对象
获取到的fs对象就是DistributedFileSysytem的实例,通过fs可以直接操作fs
获取客户端对象fs(默认配置)
在java中操作hdfs,首先要获得一个客户端实例12Configuration conf = getConf();FileSystem fs = FileSystem.get(conf);获取客户端对象fs(set)
1234Configuration conf = getConf();conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); // 指定配置conf.set("dfs.replication", "3"); // 副本数量FileSystem fs = FileSystem.get(conf);获取客户端对象fs(URI)
123Configuration conf = getConf();// 集群地址,配置信息FileSystem fs = FileSystem.get(URI.create("hdfs://172.16.0.4:9000"), conf);get方法是如何判断具体实例化了哪种客户端呢?
首先从conf中的参数fs.defaultFS的配置值判断,如果我们的代码中没有指定fs.defaultFS,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象。因此,推荐使用默认值打包到安装有hadoop客户端的Linux上运行。
java-API基本操作
上传文件
12345678910111213141516171819// hadoop jar// extends Configuration获得配置对象// implements Tool控制命名参数public class Put extends Configured implements Tool{public static void main(String[] args) throws Exception {ToolRunner.run(new Put(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs1 = FileSystem.get(conf);Path fin = new Path(conf.get("inpath")); // 本地端Path fout = new Path(conf.get("outpath")); // 服务端fs1.copyFromLocalFile(fin,fout);return 0;}}下载文件
12345678910111213141516public class Get extends Configured implements Tool{public static void main(String[] args) throws Exception {ToolRunner.run(new Get(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs1 = FileSystem.get(conf);Path fin = new Path(conf.get("inpath")); // 服务端Path fout = new Path(conf.get("outpath")); // 本地端fs1.copyToLocalFile(fin,fout);return 0;}}目录操作(增、删、改)
1234567891011121314151617public class DirTest extends Configured implements Tool{public static void main(String[] args) throws Exception {ToolRunner.run(new DirTest(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs1 = FileSystem.get(conf);Path fin = new Path(conf.get("inpath"));Path fout = new Path(conf.get("outpath"));fs1.mkdirs(fin); // 创建目录// fs1.delete(fout, true) // 删除文件夹 ,如果是非空文件夹,参数2必须给值true// fs1.rename(fin,fout); // 重命名文件或文件夹return 0;}}递归查看目录信息(只显示文件)
123456789101112131415161718192021222324252627282930313233public class Ls extends Configured implements Tool {public static void main(String[] args) throws Exception {ToolRunner.run(new Ls(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs = FileSystem.get(conf);RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(conf.get("path")),true);while(listFiles.hasNext()){LocatedFileStatus fileStatus = listFiles.next();System.out.println("blocksize:"+fileStatus.getBlockSize()); //块大小System.out.println("owner:"+fileStatus.getOwner()); //主用户System.out.println("replication:"+fileStatus.getReplication()); //副本数量System.out.println("permission:"+fileStatus.getPermission()); //权限System.out.println("name:"+fileStatus.getPath().getName()); //文件名称BlockLocation[] blockLocations = fileStatus.getBlockLocations();for (BlockLocation b : blockLocations) {System.out.println("块的名字:"+b.getNames());System.out.println("块的偏移量:"+b.getOffset());System.out.println("块的长度"+b.getLength());//块所在的datanode节点String[] datanodes = b.getHosts();for (String dn : datanodes){System.out.println("datanode:"+ dn);}}System.out.println("=================");}return 0;}}递归查看目录信息(显示文件和文件夹)
1234567891011121314151617181920212223242526272829303132333435public class List extends Configured implements Tool {FileSystem fs;@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();fs = FileSystem.get(conf);FileStatus[] sts = fs.listStatus(new Path(conf.get("path")));Stream.of(sts).forEach(this::showDetail);return 0;}//对于FileStatus进行判断//如果是文件,打印相关元信息//如果代表是目录,递归调用showDetailpublic void showDetail(FileStatus st) {if(st.isFile() && st.getLen() > 0){show(st);}else if(st.isDirectory()){try {Stream.of(fs.listStatus(st.getPath())).forEach(this::showDetail);} catch (IOException e) {e.printStackTrace();}}}public void show(FileStatus st){System.out.println("--------");System.out.println(st.getPath());System.out.println(st.getPermission());System.out.println(st.getAccessTime());System.out.println(st.getOwner());}public static void main(String[] args) throws Exception {ToolRunner.run(new List(),args);}}
java-API基本操作(流)
上传文件
123456789101112131415161718public class Put extends Configured implements Tool{public static void main(String[] args) throws Exception {ToolRunner.run(new Put(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs1 = FileSystem.get(conf);Path fin = new Path(conf.get("inpath")); // 本地端Path fout = new Path(conf.get("outpath")); // 服务端FSDataInputStream in = fs1.open(fin);FSDataOutputStream out = fs1.create(fout);IOUtils.copyBytes(in,out,128,true);return 0;}}下载文件
123456789101112131415161718public class Get extends Configured implements Tool{public static void main(String[] args) throws Exception {ToolRunner.run(new Get(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs1 = FileSystem.get(conf);Path fin = new Path(conf.get("inpath")); // 服务端Path fout = new Path(conf.get("outpath")); // 本地端FSDataInputStream in = fs1.open(fin); //输入FSDataOutputStream out = fs2.create(fout); //输出到本地IOUtils.copyBytes(in,out,128,true);return 0;}}查看文件内容
12345678910111213141516public class Cat extends Configured implements Tool{public static void main(String[] args) throws Exception {ToolRunner.run(new Cat(),args); //把参数拆分封装到Configuration}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();FileSystem fs1 = FileSystem.get(conf);Path fin = new Path(conf.get("inpath"));FSDataInputStream in = fs1.open(fin); // 输入IOUtils.copyBytes(in,System.out,128,true); // 输出到控制台return 0;}}
常见java-API操作
- 压缩和解压缩(CompressionCodec)
上传文件并压缩1234567891011121314151617181920212223public class CompressionWriteTest extends Configured implements Tool {public static void main(String[] args) throws Exception {ToolRunner.run(new CompressionWriteTest(),args);}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Path inpath = new Path(conf.get("inpath"));Path outpath = new Path(conf.get("outpath"));FileSystem infs = FileSystem.getLocal(conf);FileSystem outfs = FileSystem.get(conf);FSDataInputStream in = infs.open(inpath);FSDataOutputStream out = outfs.create(outpath);//压缩要上传的文件CompressionCodecFactory factory = new CompressionCodecFactory(conf);CompressionCodec codec = factory.getCodec(outpath);CompressionOutputStream cout = codec.createOutputStream(out);IOUtils.copyBytes(in,cout,128,true);return 0;}}
下载文件并解压缩
- SequenceFile
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件。SequenceFile文件可支持三种压缩类型NONE:对records不进行压缩;RECORD:仅压缩每一个record中的value值;BLOCK:将一个block中的所有records压缩在一起。
FileKey.java123456789101112131415161718192021222324252627282930313233343536373839404142434445464748public class FileKey implements WritableComparable<FileKey> {private Text fileName = new Text();private LongWritable length = new LongWritable();@Overridepublic void readFields(DataInput input) throws IOException {fileName.readFields(input);length.readFields(input);}@Overridepublic void write(DataOutput output) throws IOException {fileName.write(output);length.write(output);}@Overridepublic int compareTo(FileKey other) {if((fileName.compareTo(other.fileName) ==0)&&(length.compareTo(other.length)==0)){return 0;}else if((fileName.compareTo(other.fileName) !=0)){return fileName.compareTo(other.fileName);}else{return length.compareTo(other.length);}}public String getFileName() {return fileName.toString();}public void setFileName(String fileName) {this.fileName.set(fileName);}public long getLength() {return length.get();}public void setLength(long length) {this.length.set(length);}public String toString(){return fileName.toString() + ":" + length.get();}}
PutSeq.java
ListSeq.java
GetSeq.java
- serialization(序列化)
hadoop序列化和反序列化123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354public class Student implements Writable {//属性类为hadoop已经提供的类型private IntWritable id;private Text name;private IntWritable age;//反序列化时,需要反射调用空参构造函数,所以需要显式定义一个public Student(){}//构造器 复制赋值 不要引用赋值public Student(IntWritable id, Text name, IntWritable age) {this.id = new IntWritable(id.get());this.name = new Text(name.toString());this.age = new IntWritable(age.get());}public IntWritable getId() {return id;}public void setId(IntWritable id) {this.id = new IntWritable(id.get());}public Text getName() {return name;}public void setName(Text name) {this.name = new Text(name.toString());}public IntWritable getAge() {return age;}public void setAge(IntWritable age) {this.age = new IntWritable(age.get());}@Overridepublic void write(DataOutput dataOutput) throws IOException {id.write(dataOutput);name.write(dataOutput);age.write(dataOutput);}@Overridepublic void readFields(DataInput dataInput) throws IOException {id.readFields(dataInput);name.readFields(dataInput);age.readFields(dataInput);}}
最后更新: 2018年10月08日 18:25