HDFS的java-API操作

简介

hdfs在生产应用中主要是客户端的开发,其核心步骤是从hdfs提供的api中构造一个hdfs的访问客户端对象,然后通过该客户端对象操作(增删改查)hdfs上的文件。

导入依赖包(maven)

利用maven导入hadoop开发相关的依赖包,pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hadoop</groupId>
<artifactId>hadoop</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>hdfs</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>3.0.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- -->
</dependencies>
<build>
<defaultGoal>compile</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

注:如需手动引入jar包,hdfs的jar包—-hadoop的安装目录的share下

获取API中的客户端对象

获取到的fs对象就是DistributedFileSysytem的实例,通过fs可以直接操作fs

  1. 获取客户端对象fs(默认配置)
    在java中操作hdfs,首先要获得一个客户端实例

    1
    2
    Configuration conf = getConf();
    FileSystem fs = FileSystem.get(conf);
  2. 获取客户端对象fs(set)

    1
    2
    3
    4
    Configuration conf = getConf();
    conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); // 指定配置
    conf.set("dfs.replication", "3"); // 副本数量
    FileSystem fs = FileSystem.get(conf);
  3. 获取客户端对象fs(URI)

    1
    2
    3
    Configuration conf = getConf();
    // 集群地址,配置信息
    FileSystem fs = FileSystem.get(URI.create("hdfs://172.16.0.4:9000"), conf);
  4. get方法是如何判断具体实例化了哪种客户端呢?
    首先从conf中的参数fs.defaultFS的配置值判断,如果我们的代码中没有指定fs.defaultFS,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象。因此,推荐使用默认值打包到安装有hadoop客户端的Linux上运行。

java-API基本操作

  1. 上传文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // hadoop jar
    // extends Configuration获得配置对象
    // implements Tool控制命名参数
    public class Put extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Put(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs1 = FileSystem.get(conf);
    Path fin = new Path(conf.get("inpath")); // 本地端
    Path fout = new Path(conf.get("outpath")); // 服务端
    fs1.copyFromLocalFile(fin,fout);
    return 0;
    }
    }
  2. 下载文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class Get extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Get(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs1 = FileSystem.get(conf);
    Path fin = new Path(conf.get("inpath")); // 服务端
    Path fout = new Path(conf.get("outpath")); // 本地端
    fs1.copyToLocalFile(fin,fout);
    return 0;
    }
    }
  3. 目录操作(增、删、改)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class DirTest extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new DirTest(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs1 = FileSystem.get(conf);
    Path fin = new Path(conf.get("inpath"));
    Path fout = new Path(conf.get("outpath"));
    fs1.mkdirs(fin); // 创建目录
    // fs1.delete(fout, true) // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
    // fs1.rename(fin,fout); // 重命名文件或文件夹
    return 0;
    }
    }
  4. 递归查看目录信息(只显示文件)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class Ls extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Ls(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs = FileSystem.get(conf);
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(conf.get("path")),true);
    while(listFiles.hasNext()){
    LocatedFileStatus fileStatus = listFiles.next();
    System.out.println("blocksize:"+fileStatus.getBlockSize()); //块大小
    System.out.println("owner:"+fileStatus.getOwner()); //主用户
    System.out.println("replication:"+fileStatus.getReplication()); //副本数量
    System.out.println("permission:"+fileStatus.getPermission()); //权限
    System.out.println("name:"+fileStatus.getPath().getName()); //文件名称
    BlockLocation[] blockLocations = fileStatus.getBlockLocations();
    for (BlockLocation b : blockLocations) {
    System.out.println("块的名字:"+b.getNames());
    System.out.println("块的偏移量:"+b.getOffset());
    System.out.println("块的长度"+b.getLength());
    //块所在的datanode节点
    String[] datanodes = b.getHosts();
    for (String dn : datanodes){
    System.out.println("datanode:"+ dn);
    }
    }
    System.out.println("=================");
    }
    return 0;
    }
    }
  5. 递归查看目录信息(显示文件和文件夹)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class List extends Configured implements Tool {
    FileSystem fs;
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    fs = FileSystem.get(conf);
    FileStatus[] sts = fs.listStatus(new Path(conf.get("path")));
    Stream.of(sts).forEach(this::showDetail);
    return 0;
    }
    //对于FileStatus进行判断
    //如果是文件,打印相关元信息
    //如果代表是目录,递归调用showDetail
    public void showDetail(FileStatus st) {
    if(st.isFile() && st.getLen() > 0){
    show(st);
    }else if(st.isDirectory()){
    try {
    Stream.of(fs.listStatus(st.getPath())).forEach(this::showDetail);
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    public void show(FileStatus st){
    System.out.println("--------");
    System.out.println(st.getPath());
    System.out.println(st.getPermission());
    System.out.println(st.getAccessTime());
    System.out.println(st.getOwner());
    }
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new List(),args);
    }
    }

java-API基本操作(流)

  1. 上传文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class Put extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Put(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs1 = FileSystem.get(conf);
    Path fin = new Path(conf.get("inpath")); // 本地端
    Path fout = new Path(conf.get("outpath")); // 服务端
    FSDataInputStream in = fs1.open(fin);
    FSDataOutputStream out = fs1.create(fout);
    IOUtils.copyBytes(in,out,128,true);
    return 0;
    }
    }
  2. 下载文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class Get extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Get(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs1 = FileSystem.get(conf);
    Path fin = new Path(conf.get("inpath")); // 服务端
    Path fout = new Path(conf.get("outpath")); // 本地端
    FSDataInputStream in = fs1.open(fin); //输入
    FSDataOutputStream out = fs2.create(fout); //输出到本地
    IOUtils.copyBytes(in,out,128,true);
    return 0;
    }
    }
  3. 查看文件内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class Cat extends Configured implements Tool{
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Cat(),args); //把参数拆分封装到Configuration
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    FileSystem fs1 = FileSystem.get(conf);
    Path fin = new Path(conf.get("inpath"));
    FSDataInputStream in = fs1.open(fin); // 输入
    IOUtils.copyBytes(in,System.out,128,true); // 输出到控制台
    return 0;
    }
    }

常见java-API操作

  1. 压缩和解压缩(CompressionCodec)
    上传文件并压缩
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class CompressionWriteTest extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new CompressionWriteTest(),args);
    }
    @Override
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    Path inpath = new Path(conf.get("inpath"));
    Path outpath = new Path(conf.get("outpath"));
    FileSystem infs = FileSystem.getLocal(conf);
    FileSystem outfs = FileSystem.get(conf);
    FSDataInputStream in = infs.open(inpath);
    FSDataOutputStream out = outfs.create(outpath);
    //压缩要上传的文件
    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    CompressionCodec codec = factory.getCodec(outpath);
    CompressionOutputStream cout = codec.createOutputStream(out);
    IOUtils.copyBytes(in,cout,128,true);
    return 0;
    }
    }

下载文件并解压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CompressionReadTest extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new CompressionReadTest(),args);
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
Path inpath = new Path(conf.get("inpath"));
Path outpath = new Path(conf.get("outpath"));
FileSystem infs = FileSystem.get(conf);
FileSystem outfs = FileSystem.getLocal(conf);
FSDataInputStream in = infs.open(inpath);
FSDataOutputStream out = outfs.create(outpath);
//解压要下载的文件
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inpath);
CompressionInputStream cin = codec.createInputStream(in);
IOUtils.copyBytes(cin,out,128,true);
return 0;
}
}

  1. SequenceFile
    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件。SequenceFile文件可支持三种压缩类型NONE:对records不进行压缩;RECORD:仅压缩每一个record中的value值;BLOCK:将一个block中的所有records压缩在一起。
    FileKey.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public class FileKey implements WritableComparable<FileKey> {
    private Text fileName = new Text();
    private LongWritable length = new LongWritable();
    @Override
    public void readFields(DataInput input) throws IOException {
    fileName.readFields(input);
    length.readFields(input);
    }
    @Override
    public void write(DataOutput output) throws IOException {
    fileName.write(output);
    length.write(output);
    }
    @Override
    public int compareTo(FileKey other) {
    if((fileName.compareTo(other.fileName) ==0)
    &&(length.compareTo(other.length)==0)){
    return 0;
    }else if((fileName.compareTo(other.fileName) !=0)){
    return fileName.compareTo(other.fileName);
    }else{
    return length.compareTo(other.length);
    }
    }
    public String getFileName() {
    return fileName.toString();
    }
    public void setFileName(String fileName) {
    this.fileName.set(fileName);
    }
    public long getLength() {
    return length.get();
    }
    public void setLength(long length) {
    this.length.set(length);
    }
    public String toString(){
    return fileName.toString() + ":" + length.get();
    }
    }

PutSeq.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
*将目录及子目录和成一个序列文件文件上传
*/
public class PutSeq extends Configured implements Tool {
private void construct(File dir, Writer writer) throws IOException {
File[] lists = dir.listFiles();
Stream.of(lists).forEach(f -> {
try {
if (f.isDirectory()) {
//一个目录的开始,做上同步标记
writer.sync();
construct(f, writer);
} else {
byte[] content = getData(f);
FileKey key = new FileKey();
BytesWritable value = new BytesWritable();
key.setFileName(f.getPath());
key.setLength(f.length());
value.set(content, 0, content.length);
writer.append(key, value);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
/**
* 从file文件中读取所有数据,放入字节数组并返回该字节数组
*/
private byte[] getData(File file) throws IOException {
FileInputStream fis = new FileInputStream(file);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] content = new byte[1024];
int length = 0;
while ((length = fis.read(content, 0, 1024)) > 0) {
baos.write(content, 0, length);
}
fis.close();
baos.flush();
byte[] r = baos.toByteArray();
baos.close();
return r;
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
SequenceFile.Writer.Option op1 = Writer.file(new Path(conf.get("output")));
SequenceFile.Writer.Option op2 = Writer.keyClass(FileKey.class);
SequenceFile.Writer.Option op3 = Writer.valueClass(BytesWritable.class);
SequenceFile.Writer writer = SequenceFile.createWriter(conf, op1, op2, op3);
File ds = new File(conf.get("input"));
construct(ds, writer);
writer.close();
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new PutSeq(), args));
}
}

ListSeq.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 展示序列文件的目录结构
*/
public class ListSeq extends Configured implements Tool {
private SequenceFile.Reader reader;
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
Path input = new Path(conf.get("input"));
SequenceFile.Reader.Option op1 = SequenceFile.Reader.file(input);
reader = new SequenceFile.Reader(conf, op1);
Writable key = (Writable) reader.getKeyClass().newInstance();
Writable value = (Writable) reader.getValueClass().newInstance();
reader.sync(reader.getPosition());
while (reader.next(key, value)) {
FileKey file = (FileKey) key;
System.out.printf("%s\n", new File(file.getFileName()).getParent());
reader.sync(reader.getPosition());
}
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new ListSeq(), args));
}
}

GetSeq.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 下载序列文件并还原其目录结构
*/
public class GetSeq extends Configured implements Tool {
private SequenceFile.Reader reader;
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
Path input = new Path(conf.get("input"));
File output = new File(conf.get("output"));
SequenceFile.Reader.Option op1 = SequenceFile.Reader.file(input);
reader = new SequenceFile.Reader(conf, op1);
Writable key = (Writable) reader.getKeyClass().newInstance();
Writable value = (Writable) reader.getValueClass().newInstance();
while (reader.next(key, value)) {
String file = ((FileKey) key).getFileName().toString();
save(new File(output, file), value);
}
return 0;
}
private void save(File file, Writable value) throws IOException {
File d = file.getParentFile();
if (!d.exists()) d.mkdirs();
BytesWritable bw = (BytesWritable) value;
byte[] bs = bw.copyBytes();
FileOutputStream fos = new FileOutputStream(file);
fos.write(bs, 0, bs.length);
fos.close();
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new GetSeq(), args));
}
}

  1. serialization(序列化)
    hadoop序列化和反序列化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public class Student implements Writable {
    //属性类为hadoop已经提供的类型
    private IntWritable id;
    private Text name;
    private IntWritable age;
    //反序列化时,需要反射调用空参构造函数,所以需要显式定义一个
    public Student(){}
    //构造器 复制赋值 不要引用赋值
    public Student(IntWritable id, Text name, IntWritable age) {
    this.id = new IntWritable(id.get());
    this.name = new Text(name.toString());
    this.age = new IntWritable(age.get());
    }
    public IntWritable getId() {
    return id;
    }
    public void setId(IntWritable id) {
    this.id = new IntWritable(id.get());
    }
    public Text getName() {
    return name;
    }
    public void setName(Text name) {
    this.name = new Text(name.toString());
    }
    public IntWritable getAge() {
    return age;
    }
    public void setAge(IntWritable age) {
    this.age = new IntWritable(age.get());
    }
    @Override
    public void write(DataOutput dataOutput) throws IOException {
    id.write(dataOutput);
    name.write(dataOutput);
    age.write(dataOutput);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
    id.readFields(dataInput);
    name.readFields(dataInput);
    age.readFields(dataInput);
    }
    }

最后更新: 2018年10月08日 18:25

原始链接: https://www.lousenjay.top/2018/08/26/HDFS入门详解(二)/