环境:centos7+hadoop3.0.3+hbase2.0.1+jdk8

HBase开发

测试代码准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Configuration conf = null;
ExecutorService pool = null;
Connection conn = null;
Admin admin = null;
Table table = null;
HTable t = null;
//同步获得hbase连接
@Before
public void before() throws IOException {
// 设置HBase配置信息
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","hadoop5:2181");
conf.set("hbase.master.dns.interface","hadoop5");
pool = Executors.newFixedThreadPool(10); // 创建线程池
conn = ConnectionFactory.createConnection(conf,pool); // 创建连接池
admin = conn.getAdmin(); // 创建表的管理类
System.out.println("获取同步连接成功");
}
@After
public void after() throws IOException {
// 关闭连接
conn.close();
System.out.println("连接已关闭");
}

连接HBase

  1. 同步连接

    1
    2
    3
    4
    5
    6
    7
    8
    // 设置HBase配置信息
    conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum","hadoop5:2181");
    conf.set("hbase.master.dns.interface","hadoop5");
    pool = Executors.newFixedThreadPool(10); // 创建线程池
    conn = ConnectionFactory.createConnection(conf,pool); // 创建连接池
    admin = conn.getAdmin(); // 创建表的管理类
    System.out.println("获取同步连接成功");
  2. 异步连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    //异步获取hbase连接方式
    @Test
    public void asynctest() throws Exception {
    // 设置HBase配置信息
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "hadoop5:2181");
    conf.set("hbase.master.dns.interface", "hadoop5");
    System.out.println("开始获取连接");
    // 获取异步连接对象
    CompletableFuture<AsyncConnection> aconn = ConnectionFactory.createAsyncConnection(conf);
    System.out.println("请等待");
    // 设置超时时长,单位为毫秒
    AsyncConnection conn = aconn.get(1000,TimeUnit.MILLISECONDS);
    System.out.println("获取异步连接成功");
    ExecutorService pool = Executors.newFixedThreadPool(2);
    AsyncAdmin admin = conn.getAdmin(pool);
    conn.close();
    }

命名空间

  1. 创建命名空间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //创建命名空间
    @Test
    public void createNS() throws IOException {
    // 创建命名空间描述类的对象,并指定命名空间的名称
    NamespaceDescriptor des = NamespaceDescriptor.create("IMUT").build();
    // 通过表的管理类的对象创建命名空间
    admin.createNamespace(des);
    System.out.println("命名空间创建成功");
    }
  2. 删除命名空间

    1
    2
    3
    4
    5
    6
    7
    8
    //删除命名空间
    @Test
    public void deleteNS() throws IOException {
    // 通过表的管理类的对象删除指定的命名空间
    // 需要先清空命名空间所有表
    admin.deleteNamespace("IMUT");
    System.out.println("命名空间删除成功");
    }

创建表

  1. 同步创建表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    //同步创建表
    @Test
    public void createTBSync() throws Exception{
    // 创建表的描述类的Builder,HTableDescriptorBuilder
    // 设置表名
    TableDescriptorBuilder tbbuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("IMUT:employee_sync"));
    // 创建列族的描述类的Builder,ColumnFamilyDescriptorBuilder
    // 设置列族名
    ColumnFamilyDescriptorBuilder cfbuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("baseinfo"));
    // 设置该列族的属性=>最大版本数为10
    cfbuilder.setMaxVersions(10);
    // 通过建造者模式获取列族描述类的对象
    ColumnFamilyDescriptor cf = cfbuilder.build();
    // 将该列族组装到表的Bulider对象上
    tbbuilder.setColumnFamily(cf);
    // 通过建造者模式获取表的描述类的对象
    TableDescriptor td = tbbuilder.build();
    System.out.println("命令开始");
    // 通过表管理类的对象admin创建表并指定分区
    admin.createTable(td, new byte[][]{{10}, {20}});
    System.out.println("表创建完毕");
    }
  2. 异步创建表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    //异步创建表
    @Test
    public void createTBAsync() throws Exception{
    // 创建表的描述类的Builder,HTableDescriptorBuilder
    // 设置表名
    TableDescriptorBuilder tbbuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("IMUT:employee_async"));
    // 创建列族的描述类的Builder,ColumnFamilyDescriptorBuilder
    // 设置列族名
    ColumnFamilyDescriptorBuilder cfbuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("baseinfo"));
    // 设置该列族的属性=>最大版本数为10
    cfbuilder.setMaxVersions(10);
    // 通过建造者模式获取列族描述类的对象
    ColumnFamilyDescriptor cf = cfbuilder.build();
    // 将该列族组装到表的Bulider对象上
    tbbuilder.setColumnFamily(cf);
    // 通过建造者模式获取表的描述类的对象
    TableDescriptor td = tbbuilder.build();
    System.out.println("命令发出");
    // 通过表管理类的对象admin异步创建表并指定分区
    Future<Void> future = admin.createTableAsync(td, new byte[][]{Bytes.toBytes("10"), Bytes.toBytes("20"),Bytes.toBytes("30")});
    System.out.println("等待结果");
    // 设置超时时长
    Void aVoid = future.get(5000, TimeUnit.MILLISECONDS);
    System.out.println("表创建成功");
    }

删除表

  1. 同步删除表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //同步删除表
    @Test
    public void deleteTBSync() throws Exception {
    System.out.println("开始删除表");
    // 第一步:通过表的管理类的对象禁用要删除的表
    admin.disableTable(TableName.valueOf("IMUT:employee_sync"));
    System.out.println("禁用表成功");
    // 第二步:通过表的管理类的对象删除被禁用的表
    admin.deleteTable(TableName.valueOf("IMUT:employee_sync"));
    System.out.println("删除表成功");
    }
  2. 异步删除表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //异步删除表
    @Test
    public void deleteTBAsync() throws Exception {
    System.out.println("开始删除表");
    // 第一步:通过表的管理类的对象异步禁用要删除的表,并设置超时时长
    Future<Void> f1 = admin.disableTableAsync(TableName.valueOf("IMUT:employee_async"));
    Void v1 = f1.get(1000, TimeUnit.MILLISECONDS);
    System.out.println("禁用表成功");
    // 第二步:通过表的管理类的对象异步删除被禁用的表,并设置超时时长
    Future<Void> f2 = admin.deleteTableAsync(TableName.valueOf("IMUT:employee_async"));
    Void v2 = f2.get(1000, TimeUnit.MILLISECONDS);
    System.out.println("删除表成功");
    }

添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 添加数据
@Test
public void putTest() throws IOException {
// 通过表名获取到指定表
table = conn.getTable(TableName.valueOf("IMUT:employee_sync"));
// 创建添加数据的类的对象,并指定行键
Put put = new Put(Bytes.toBytes("15"));
// 添加数据信息,列族,列名,值
// 参数全部为byte[]类型
put.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("name"),Bytes.toBytes("kevin"));
put.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("age"),Bytes.toBytes("43"));
put.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("gender"),Bytes.toBytes("male"));
put.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("position"),Bytes.toBytes("CTO"));
// 将数据添加到该表中
table.put(put);
System.out.println("插入数据成功");
}

获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 获取数据
@Test
public void getTest() throws Exception {
// 通过表名获取到指定表
table = conn.getTable(TableName.valueOf("IMUT:employee_sync"));
// 创建获取数据的类的对象,并指定行键
Get get = new Get(Bytes.toBytes("15"));
// 将从表中获取到的数据返回到结果集中
Result result = table.get(get);
long t = table.getReadRpcTimeout(TimeUnit.MILLISECONDS);
System.out.println("read超时时间:"+t);
// 自定义方法,打印一行的结果
showResult(result);
}
// 打印结果
public void showResult(Result result){
// 检查该列族下的该列是否存在值,不论是否为空
System.out.println("是否存在值不论是否为空:"+result.containsColumn(Bytes.toBytes("baseinfo"),Bytes.toBytes("age")));
// 检查该列族下的该列是否含有空值
System.out.println("是否存在空值:"+result.containsEmptyColumn(Bytes.toBytes("baseinfo"),Bytes.toBytes("age")));
// 检查该列族下的该列是否存在非空值
System.out.println("是否存在非空值:"+result.containsNonEmptyColumn(Bytes.toBytes("baseinfo"),Bytes.toBytes("age")));
// 返回result中有几个cell
System.out.println("cell的个数:"+result.size());
// 获得某列族某列下的最新值
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("baseinfo"),Bytes.toBytes("name"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("baseinfo"),Bytes.toBytes("age"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("baseinfo"),Bytes.toBytes("gender"))));
// 返回第一个cell中的value值
System.out.println("第一个cell中的value值:"+Bytes.toString(result.value()));
System.out.println("-----------------");
// 四层嵌套的Map
NavigableMap<byte[], byte[]> qvmap = result.getFamilyMap(Bytes.toBytes("baseinfo"));
// 遍历列族
for (Map.Entry<byte[], byte[]> entry : qvmap.entrySet()) {
System.out.println("baseinfo"+" "+Bytes.toString(entry.getKey())+ " "+Bytes.toString(entry.getValue()));
}
System.out.println("-----------------");
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> fqtvs = result.getMap();
// 遍历列族,第一层Map的key为列族
for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> fqtv : fqtvs.entrySet()) {
byte[] f = fqtv.getKey();
NavigableMap<byte[], NavigableMap<Long, byte[]>> qtvs = fqtv.getValue();
// 遍历列名,第二层Map的key为列名
for (Map.Entry<byte[], NavigableMap<Long, byte[]>> qtv : qtvs.entrySet()) {
byte[] q = qtv.getKey();
NavigableMap<Long, byte[]> tvs = qtv.getValue();
// 遍历时间戳,第三层Map的key为时间戳,value为值
for (Map.Entry<Long, byte[]> tv : tvs.entrySet()) {
Long t = tv.getKey();
byte[] v = tv.getValue();
System.out.println("row:"+Bytes.toString(result.getRow())+" "
+"family:"+Bytes.toString(f)+" "
+"qualifier:"+Bytes.toString(q)+" "
+"timestamp:"+t+" "
+"value:"+Bytes.toString(v)+" ");
}
}
}
System.out.println("===========================================================");
}

扫描(过滤器)

  1. 扫描全表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 扫描
    @Test
    public void scanTest() throws IOException {
    // 通过表名获取到指定表
    table = conn.getTable(TableName.valueOf("IMUT:employee_sync"));
    // 创建扫描的操作类的对象
    Scan scan = new Scan();
    // 将扫描得到的结果返回到结果集中
    ResultScanner scanner = table.getScanner(scan);
    // 遍历打印所有符合条件的行
    for (Result result : scanner) {
    showResult(result);
    }
    }
  2. 过滤器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    // 扫描
    @Test
    public void scanTest() throws IOException {
    // 通过表名获取到指定表
    table = conn.getTable(TableName.valueOf("IMUT:employee_sync"));
    // 创建扫描的操作类的对象
    Scan scan = new Scan();
    // ---------有关行键的过滤器---------
    // 行键前缀过滤器
    Filter f = new PrefixFilter(Bytes.toBytes("15"));
    // 只扫描行键的过滤器,不获取值
    // Filter f = new KeyOnlyFilter();
    // 随机行过滤器,按比例获取随机行数
    // Filter f = new RandomRowFilter(0.5f);
    // 行范围过滤器
    // List<MultiRowRangeFilter.RowRange> list = new ArrayList<>();
    // list.add(new MultiRowRangeFilter.RowRange("1001",true,"1004",true));
    // Filter f = new MultiRowRangeFilter(list);
    // 行键过滤器,可以通过提供不同比较器实现不同功能,如匹配行键前缀
    // Filter f = new RowFilter(CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("10")));
    // 列族相关过滤器,匹配正则表达式
    // Filter f = new FamilyFilter(CompareOperator.EQUAL, new RegexStringComparator(".*base.*"));
    // 列相关过滤器,获得前n列过滤器
    // Filter f = new ColumnCountGetFilter(1);
    // 列分页过滤器 基于ColumnPaginationFilter,参数1 获得n列 参数2 从第n列开始获取
    // Filter f = new ColumnPaginationFilter(3,1);
    // 列名前缀过滤器
    // Filter f = new ColumnPrefixFilter(Bytes.toBytes("na"));
    // 列名范围过滤器
    // Filter f = new ColumnRangeFilter(Bytes.toBytes("aaa"),true,Bytes.toBytes("ccc"),true);
    // 匹配某列族某列某值的过滤器
    // Filter f = new ColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("age"),CompareOperator.EQUAL,Bytes.toBytes("20"));
    // 根据值的比较过滤行
    // Filter f = new ValueFilter(CompareOperator.EQUAL, new RegexStringComparator(".*t.*"));
    // 设置扫描的过滤器
    scan.setFilter(f);
    // 将扫描得到的结果返回到结果集中
    ResultScanner scanner = table.getScanner(scan);
    // 遍历打印所有符合条件的行
    for (Result result : scanner) {
    showResult(result);
    }
    }

最后更新: 2018年10月08日 18:25

原始链接: https://www.lousenjay.top/2018/09/19/HBase入门详解(二)/