搭建环境:CentOS7+zookeeper-3.4.5

简介

Zookeeper是一个分布式协调服务,就是为用户的分布式应用程序提供协调服务。

  • zookeeper是为别的分布式程序服务的
  • zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……
  • zookeeper在底层其实只提供了两个功能
    • 管理(存储,读取)用户程序提交的数据
    • 为用户程序提供数据节点监听服务
  • zookeeper集群的角色:Leader和follower(Observer)
  • 注:zookeeper只适合装在奇数台机器上!!!

安装zookeeper集群

  1. 第一台机器(hadoop1)

    • 上传zookeeper-3.4.5.tar.gz安装包到第一台机器
      rz
    • 创建apps文件夹
      mkdir /root/apps
    • 解压zookeeper-3.4.5.tar.gz到apps文件夹
      tar -zxvf zookeeper-3.4.5.tar.gz -C apps/
    • 删除不必要的文件,加快scp传输速度
      cd /root/apps/zookeeper-3.4.5
      rm rm -rf src/ dist-maven/ docs/ *.xml *.txt
    • 修改配置文件zoo.cfg
      cd /root/apps/zookeeper-3.4.5/conf
      cp zoo_sample.cfg zoo.cfg
      vi zoo.cfg

      1
      2
      3
      4
      5
      6
      7
      8
      tickTime=2000
      initLimit=10
      syncLimit=5
      dataDir=/root/zkdata
      clientPort=2181
      server.1=hadoop1:2888:3888
      server.2=hadoop2:2888:3888
      server.3=hadoop3:2888:3888
    • 创建数据文件
      mkdir /root/zkdata
      cd /root/zkdata
      echo 1 > myid

    • 将zookeeper-3.4.5目录拷贝到其他机器上
      scp -r /root/apps/zookeeper-3.4.5 hadoop2:/root/apps/
      scp -r /root/apps/zookeeper-3.4.5 hadoop3:/root/apps/
  2. 第二台机器(hadoop2)
    mkdir /root/zkdata
    cd /root/zkdata
    echo 2 > myid
  3. 第三台机器(hadoop3)
    mkdir /root/zkdata
    cd /root/zkdata
    echo 3 > myid
    4.启动
    在所有机器上开启
    cd /root/apps/zookeeper-3.4.5/
    bin/zkServer.sh start
  4. 查看运行状态
    jps(查看进程)
    bin/zkServer.sh status(查看集群状态,主从信息)
    注:必须关闭防火墙:systemctl stop firewalld.service
  5. 如果在启动时报错可查看日志
    cat /root/apps/zookeeper-3.4.5/zookeeper.out

zookeeper的结构和命令

zookeeper的特性

  1. Zookeeper:一个leader,多个follower组成的集群
  2. 全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
  3. 分布式读写,更新请求转发,由leader实施
  4. 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
  5. 数据更新原子性,一次数据更新要么成功,要么失败
  6. 实时性,在一定时间范围内,client能读到最新数据4

zookeeper的数据结构

  1. 层次化的目录结构,命名符合常规文件系统规范
  2. 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
  3. 节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
  4. 客户端应用可以在节点上设置监视器

节点类型

  1. Znode有两种类型:
    短暂(ephemeral)(断开连接自己删除)
    持久(persistent)(断开连接不删除)
  2. Znode有四种形式的目录节点(默认是persistent )
    PERSISTENT
    PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
    EPHEMERAL
    EPHEMERAL_SEQUENTIAL
  3. 创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
  4. 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

zookeeper命令行操作

  1. 启动客户端
    cd /root/apps/zookeeper-3.4.5/bin/
    sh zkCli.sh
  2. 查看命令帮助
    help
  3. 连接到其他机器
    connect hadoop3:2181
  4. 查看节点
    • ls / 查看根节点
      • ls /app1 查看根节点下的app1节点
  5. 查看节点存储的数据信息
    get /app1
  6. 创建节点(默认为永久节点)
    create /app1 “this is app1”
    语法:create [-s|-e] 节点位置 该节点所存储数据
  7. 创建临时节点/app-emphemeral
    create -e /app-emphemeral 6666
    注:在退出客户端连接时会自动删除该节点
    注:临时节点不允许有子节点
  8. 创建带序号的节点
    create -s /app1/aa 6666
  9. 退出客户端
    quit
  10. 修改节点数据
    set /app1 666
  11. 监听节点数据变动
    get /app1 watch
    注:只监听一次变动,监听到一次变动后不再继续监听
    如:set /app1 666 操作会触发该监听
  12. 监听子节点变动
    ls /app1 watch
    注:同样只监听一次变动,监听到一次变动后不再继续监听
    如:create /app1/a 777 操作会触发该监听
  13. 删除节点
    • delete /app1/a 删除没有子节点的节点
    • rmr /app1/b 递归删除当前节点和当前节点所有子节点

zookeeper-api应用

  1. 导包
    解压zookeeper-3.4.5.tar.gz可得
    jline-0.9.94.jar
    log4j-1.2.15.jar
    netty-3.2.2.Final.jar
    slf4j-api-1.6.1.jar
    slf4j-log4j12-1.6.1.jar
    zookeeper-3.4.5.jar

  2. 增删改查demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    public class SimpleZkClient {
    private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
    private static final int sessionTimeout = 2000;
    ZooKeeper zkClient = null;
    @Before
    public void init() throws Exception {
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
    System.out.println(event.getType() + "---" + event.getPath());
    try {
    zkClient.getChildren("/", true); //监听完后继续监听
    } catch (Exception e) {
    }
    }
    });
    }
    /**
    * 数据的增删改查
    *
    * @throws InterruptedException
    * @throws KeeperException
    */
    // 创建数据节点到zk中
    @Test
    public void testCreate() throws Exception {
    // 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型:永久节点
    String nodeCreated = zkClient.create("/test", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    //上传的数据可以是任何类型,但都要转成byte[]
    }
    //判断znode是否存在
    @Test
    public void testExist() throws Exception{
    Stat stat = zkClient.exists("/test", false);
    System.out.println(stat==null?"not exist":"exist");
    }
    // 获取子节点
    @Test
    public void getChildren() throws Exception {
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
    System.out.println(child);
    }
    Thread.sleep(Long.MAX_VALUE);
    }
    //获取znode的数据
    @Test
    public void getData() throws Exception {
    byte[] data = zkClient.getData("/app1", false, null);
    System.out.println(new String(data));
    }
    //删除znode
    @Test
    public void deleteZnode() throws Exception {
    //参数2:指定要删除的版本,-1表示删除所有版本
    zkClient.delete("/test", -1);
    System.out.println("已删除");
    }
    //修改znode数据
    @Test
    public void setData() throws Exception {
    zkClient.setData("/app1", "hello".getBytes(), -1);
    byte[] data = zkClient.getData("/app1", false, null);
    System.out.println(new String(data));
    }
    }

分布式应用系统服务器上下线动态感应

  1. 思路分析

    • 服务器端:每上线一台服务器,就在固定的父节点下创建一个对应的短暂子节点,并将该服务器的信息(主机名,IP)记录到该子节点下。
    • 客户端:获取父节点下所有子节点保存的信息(并监听该父节点),从中获取服务器信息列表可以得知当前在线的服务器。如果有服务器下线(即该父节点下有子节点消失)就会触发监听,重新获取服务器列表,并再次注册监听。
  2. 服务器端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    public class DistributedServer {
    private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers";
    private ZooKeeper zk = null;
    /**
    * 创建到zk的客户端连接
    *
    * @throws Exception
    */
    public void getConnect() throws Exception {
    zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
    System.out.println(event.getType() + "---" + event.getPath());
    try {
    zk.getChildren("/", true);
    } catch (Exception e) {
    }
    }
    });
    }
    /**
    * 向zk集群注册服务器信息
    *
    * @param hostname
    * @throws Exception
    */
    public void registerServer(String hostname) throws Exception {
    String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(hostname + "is online.." + create);
    }
    /**
    * 业务功能
    *
    * @throws InterruptedException
    */
    public void handleBussiness(String hostname) throws InterruptedException {
    System.out.println(hostname + "start working.....");
    Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
    // 获取zk连接
    DistributedServer server = new DistributedServer();
    server.getConnect();
    //模拟随机生成一个主机
    int num = (int) (Math.random()*10);
    String hostname = "hadoop"+num;
    // 利用zk连接注册服务器信息
    server.registerServer(hostname);
    // 启动业务功能
    server.handleBussiness(hostname);
    }
    }
  3. 客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    public class DistributedClient {
    private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers";
    // 注意:加volatile的意义何在?
    private volatile List<String> serverList;
    private ZooKeeper zk = null;
    /**
    * 创建到zk的客户端连接
    *
    * @throws Exception
    */
    public void getConnect() throws Exception {
    zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    // 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
    try {
    //重新更新服务器列表,并且注册了监听
    getServerList();
    } catch (Exception e) {
    }
    }
    });
    }
    /**
    * 获取服务器信息列表
    *
    * @throws Exception
    */
    public void getServerList() throws Exception {
    // 获取服务器子节点信息,并且对父节点进行监听
    List<String> children = zk.getChildren(parentNode, true);
    // 先创建一个局部的list来存服务器信息
    List<String> servers = new ArrayList<String>();
    for (String child : children) {
    // child只是子节点的节点名
    byte[] data = zk.getData(parentNode + "/" + child, false, null);
    servers.add(new String(data));
    }
    // 把servers赋值给成员变量serverList,已提供给各业务线程使用
    serverList = servers;
    //打印服务器列表
    System.out.println(serverList);
    }
    /**
    * 业务功能
    *
    * @throws InterruptedException
    */
    public void handleBussiness() throws InterruptedException {
    System.out.println("client start working.....");
    Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
    // 获取zk连接
    DistributedClient client = new DistributedClient();
    client.getConnect();
    // 获取servers的子节点信息(并监听),从中获取服务器信息列表
    client.getServerList();
    // 业务线程启动
    client.handleBussiness();
    }
    }

分布式共享锁的程序逻辑流程

  1. 思路分析
    • 目的:同时只能有一台服务器获取到锁
    • 程序启动时到zookeeper上注册一个”短暂+序号”的子节点(锁),并监听其父节点
    • 获取父节点下所有程序的子节点,比较序号大小
    • 序号小的先获取到锁,去访问资源,访问完后删除自己的节点,相当于释放锁,并且重新注册一个新的子节点(新的子节点序号比已经存在的所有子节点序号大)
    • 其他程序节点会收到事件通知,让目前存在的序号最小的子节点获取锁,所有服务器都会轮流获取锁,且始终只有一台服务器获取到锁。
  2. 代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    public class DistributedClientLock {
    // 会话超时
    private static final int SESSION_TIMEOUT = 2000;
    // zookeeper集群地址
    private String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181";
    private String groupNode = "locks";
    private String subNode = "sub";
    private ZooKeeper zk;
    // 记录自己创建的子节点路径
    private volatile String thisPath;
    /**
    * 连接zookeeper
    */
    public void connectZookeeper() throws Exception {
    zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
    public void process(WatchedEvent event) {
    try {
    // 判断事件类型,此处只处理子节点变化事件
    if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
    //获取子节点,并对父节点进行监听
    List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
    String thisNode = thisPath.substring(("/" + groupNode + "/").length());
    // 去比较是否自己是最小id
    Collections.sort(childrenNodes);
    if (childrenNodes.indexOf(thisNode) == 0) {
    //访问共享资源处理业务,并且在处理完成之后删除锁
    doSomething();
    //重新注册一把新的锁
    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    // 1、程序一进来就先注册一把锁到zk上
    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    // wait一小会,便于观察
    Thread.sleep(new Random().nextInt(1000));
    // 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
    List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
    //如果争抢资源的程序就只有自己,则可以直接去访问共享资源
    if (childrenNodes.size() == 1) {
    doSomething();
    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    }
    /**
    * 处理业务逻辑,并且在最后释放锁
    */
    private void doSomething() throws Exception {
    try {
    System.out.println("gain lock: " + thisPath);
    Thread.sleep(2000);
    // do something
    } finally {
    System.out.println("finished: " + thisPath);
    // 删除该节点,相当于释放锁
    zk.delete(this.thisPath, -1);
    }
    }
    public static void main(String[] args) throws Exception {
    DistributedClientLock dl = new DistributedClientLock();
    dl.connectZookeeper();
    Thread.sleep(Long.MAX_VALUE);
    }
    }

最后更新: 2018年11月21日 16:09

原始链接: https://www.lousenjay.top/2018/08/08/zookeeper入门学习/