搭建环境:CentOS7+zookeeper-3.4.5
简介
Zookeeper是一个分布式协调服务,就是为用户的分布式应用程序提供协调服务。
- zookeeper是为别的分布式程序服务的
- zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……
- zookeeper在底层其实只提供了两个功能
- 管理(存储,读取)用户程序提交的数据
- 为用户程序提供数据节点监听服务
- zookeeper集群的角色:Leader和follower(Observer)
- 注:zookeeper只适合装在奇数台机器上!!!
安装zookeeper集群
第一台机器(hadoop1)
- 上传zookeeper-3.4.5.tar.gz安装包到第一台机器
rz - 创建apps文件夹
mkdir /root/apps - 解压zookeeper-3.4.5.tar.gz到apps文件夹
tar -zxvf zookeeper-3.4.5.tar.gz -C apps/ - 删除不必要的文件,加快scp传输速度
cd /root/apps/zookeeper-3.4.5
rm rm -rf src/ dist-maven/ docs/ *.xml *.txt
修改配置文件zoo.cfg
cd /root/apps/zookeeper-3.4.5/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg12345678tickTime=2000initLimit=10syncLimit=5dataDir=/root/zkdataclientPort=2181server.1=hadoop1:2888:3888server.2=hadoop2:2888:3888server.3=hadoop3:2888:3888创建数据文件
mkdir /root/zkdata
cd /root/zkdata
echo 1 > myid- 将zookeeper-3.4.5目录拷贝到其他机器上
scp -r /root/apps/zookeeper-3.4.5 hadoop2:/root/apps/
scp -r /root/apps/zookeeper-3.4.5 hadoop3:/root/apps/
- 上传zookeeper-3.4.5.tar.gz安装包到第一台机器
- 第二台机器(hadoop2)
mkdir /root/zkdata
cd /root/zkdata
echo 2 > myid - 第三台机器(hadoop3)
mkdir /root/zkdata
cd /root/zkdata
echo 3 > myid
4.启动
在所有机器上开启
cd /root/apps/zookeeper-3.4.5/
bin/zkServer.sh start - 查看运行状态
jps(查看进程)
bin/zkServer.sh status(查看集群状态,主从信息)
注:必须关闭防火墙:systemctl stop firewalld.service - 如果在启动时报错可查看日志
cat /root/apps/zookeeper-3.4.5/zookeeper.out
zookeeper的结构和命令
zookeeper的特性
- Zookeeper:一个leader,多个follower组成的集群
- 全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
- 分布式读写,更新请求转发,由leader实施
- 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
- 数据更新原子性,一次数据更新要么成功,要么失败
- 实时性,在一定时间范围内,client能读到最新数据4
zookeeper的数据结构
- 层次化的目录结构,命名符合常规文件系统规范
- 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
- 节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
- 客户端应用可以在节点上设置监视器
节点类型
- Znode有两种类型:
短暂(ephemeral)(断开连接自己删除)
持久(persistent)(断开连接不删除) - Znode有四种形式的目录节点(默认是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
EPHEMERAL
EPHEMERAL_SEQUENTIAL - 创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
- 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
zookeeper命令行操作
- 启动客户端
cd /root/apps/zookeeper-3.4.5/bin/
sh zkCli.sh - 查看命令帮助
help - 连接到其他机器
connect hadoop3:2181 - 查看节点
- ls / 查看根节点
- ls /app1 查看根节点下的app1节点
- ls / 查看根节点
- 查看节点存储的数据信息
get /app1 - 创建节点(默认为永久节点)
create /app1 “this is app1”
语法:create [-s|-e] 节点位置 该节点所存储数据 - 创建临时节点/app-emphemeral
create -e /app-emphemeral 6666
注:在退出客户端连接时会自动删除该节点
注:临时节点不允许有子节点 - 创建带序号的节点
create -s /app1/aa 6666 - 退出客户端
quit - 修改节点数据
set /app1 666 - 监听节点数据变动
get /app1 watch
注:只监听一次变动,监听到一次变动后不再继续监听
如:set /app1 666 操作会触发该监听 - 监听子节点变动
ls /app1 watch
注:同样只监听一次变动,监听到一次变动后不再继续监听
如:create /app1/a 777 操作会触发该监听 - 删除节点
- delete /app1/a 删除没有子节点的节点
- rmr /app1/b 递归删除当前节点和当前节点所有子节点
zookeeper-api应用
导包
解压zookeeper-3.4.5.tar.gz可得
jline-0.9.94.jar
log4j-1.2.15.jar
netty-3.2.2.Final.jar
slf4j-api-1.6.1.jar
slf4j-log4j12-1.6.1.jar
zookeeper-3.4.5.jar增删改查demo
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879public class SimpleZkClient {private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181";private static final int sessionTimeout = 2000;ZooKeeper zkClient = null;@Beforepublic void init() throws Exception {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)System.out.println(event.getType() + "---" + event.getPath());try {zkClient.getChildren("/", true); //监听完后继续监听} catch (Exception e) {}}});}/*** 数据的增删改查** @throws InterruptedException* @throws KeeperException*/// 创建数据节点到zk中@Testpublic void testCreate() throws Exception {// 参数1:要创建的节点的路径 参数2:节点大数据 参数3:节点的权限 参数4:节点的类型:永久节点String nodeCreated = zkClient.create("/test", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//上传的数据可以是任何类型,但都要转成byte[]}//判断znode是否存在@Testpublic void testExist() throws Exception{Stat stat = zkClient.exists("/test", false);System.out.println(stat==null?"not exist":"exist");}// 获取子节点@Testpublic void getChildren() throws Exception {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}Thread.sleep(Long.MAX_VALUE);}//获取znode的数据@Testpublic void getData() throws Exception {byte[] data = zkClient.getData("/app1", false, null);System.out.println(new String(data));}//删除znode@Testpublic void deleteZnode() throws Exception {//参数2:指定要删除的版本,-1表示删除所有版本zkClient.delete("/test", -1);System.out.println("已删除");}//修改znode数据@Testpublic void setData() throws Exception {zkClient.setData("/app1", "hello".getBytes(), -1);byte[] data = zkClient.getData("/app1", false, null);System.out.println(new String(data));}}
分布式应用系统服务器上下线动态感应
思路分析
- 服务器端:每上线一台服务器,就在固定的父节点下创建一个对应的短暂子节点,并将该服务器的信息(主机名,IP)记录到该子节点下。
- 客户端:获取父节点下所有子节点保存的信息(并监听该父节点),从中获取服务器信息列表可以得知当前在线的服务器。如果有服务器下线(即该父节点下有子节点消失)就会触发监听,重新获取服务器列表,并再次注册监听。
服务器端
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869public class DistributedServer {private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181";private static final int sessionTimeout = 2000;private static final String parentNode = "/servers";private ZooKeeper zk = null;/*** 创建到zk的客户端连接** @throws Exception*/public void getConnect() throws Exception {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)System.out.println(event.getType() + "---" + event.getPath());try {zk.getChildren("/", true);} catch (Exception e) {}}});}/*** 向zk集群注册服务器信息** @param hostname* @throws Exception*/public void registerServer(String hostname) throws Exception {String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + "is online.." + create);}/*** 业务功能** @throws InterruptedException*/public void handleBussiness(String hostname) throws InterruptedException {System.out.println(hostname + "start working.....");Thread.sleep(Long.MAX_VALUE);}public static void main(String[] args) throws Exception {// 获取zk连接DistributedServer server = new DistributedServer();server.getConnect();//模拟随机生成一个主机int num = (int) (Math.random()*10);String hostname = "hadoop"+num;// 利用zk连接注册服务器信息server.registerServer(hostname);// 启动业务功能server.handleBussiness(hostname);}}客户端
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778public class DistributedClient {private static final String connectString = "hadoop1:2181,hadoop2:2181,hadoop3:2181";private static final int sessionTimeout = 2000;private static final String parentNode = "/servers";// 注意:加volatile的意义何在?private volatile List<String> serverList;private ZooKeeper zk = null;/*** 创建到zk的客户端连接** @throws Exception*/public void getConnect() throws Exception {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)try {//重新更新服务器列表,并且注册了监听getServerList();} catch (Exception e) {}}});}/*** 获取服务器信息列表** @throws Exception*/public void getServerList() throws Exception {// 获取服务器子节点信息,并且对父节点进行监听List<String> children = zk.getChildren(parentNode, true);// 先创建一个局部的list来存服务器信息List<String> servers = new ArrayList<String>();for (String child : children) {// child只是子节点的节点名byte[] data = zk.getData(parentNode + "/" + child, false, null);servers.add(new String(data));}// 把servers赋值给成员变量serverList,已提供给各业务线程使用serverList = servers;//打印服务器列表System.out.println(serverList);}/*** 业务功能** @throws InterruptedException*/public void handleBussiness() throws InterruptedException {System.out.println("client start working.....");Thread.sleep(Long.MAX_VALUE);}public static void main(String[] args) throws Exception {// 获取zk连接DistributedClient client = new DistributedClient();client.getConnect();// 获取servers的子节点信息(并监听),从中获取服务器信息列表client.getServerList();// 业务线程启动client.handleBussiness();}}
分布式共享锁的程序逻辑流程
- 思路分析
- 目的:同时只能有一台服务器获取到锁
- 程序启动时到zookeeper上注册一个”短暂+序号”的子节点(锁),并监听其父节点
- 获取父节点下所有程序的子节点,比较序号大小
- 序号小的先获取到锁,去访问资源,访问完后删除自己的节点,相当于释放锁,并且重新注册一个新的子节点(新的子节点序号比已经存在的所有子节点序号大)
- 其他程序节点会收到事件通知,让目前存在的序号最小的子节点获取锁,所有服务器都会轮流获取锁,且始终只有一台服务器获取到锁。
- 代码实现12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879public class DistributedClientLock {// 会话超时private static final int SESSION_TIMEOUT = 2000;// zookeeper集群地址private String hosts = "hadoop1:2181,hadoop2:2181,hadoop3:2181";private String groupNode = "locks";private String subNode = "sub";private ZooKeeper zk;// 记录自己创建的子节点路径private volatile String thisPath;/*** 连接zookeeper*/public void connectZookeeper() throws Exception {zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {public void process(WatchedEvent event) {try {// 判断事件类型,此处只处理子节点变化事件if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {//获取子节点,并对父节点进行监听List<String> childrenNodes = zk.getChildren("/" + groupNode, true);String thisNode = thisPath.substring(("/" + groupNode + "/").length());// 去比较是否自己是最小idCollections.sort(childrenNodes);if (childrenNodes.indexOf(thisNode) == 0) {//访问共享资源处理业务,并且在处理完成之后删除锁doSomething();//重新注册一把新的锁thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);}}} catch (Exception e) {e.printStackTrace();}}});// 1、程序一进来就先注册一把锁到zk上thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);// wait一小会,便于观察Thread.sleep(new Random().nextInt(1000));// 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听List<String> childrenNodes = zk.getChildren("/" + groupNode, true);//如果争抢资源的程序就只有自己,则可以直接去访问共享资源if (childrenNodes.size() == 1) {doSomething();thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);}}/*** 处理业务逻辑,并且在最后释放锁*/private void doSomething() throws Exception {try {System.out.println("gain lock: " + thisPath);Thread.sleep(2000);// do something} finally {System.out.println("finished: " + thisPath);// 删除该节点,相当于释放锁zk.delete(this.thisPath, -1);}}public static void main(String[] args) throws Exception {DistributedClientLock dl = new DistributedClientLock();dl.connectZookeeper();Thread.sleep(Long.MAX_VALUE);}}
最后更新: 2018年11月21日 16:09