Zookeeper
应用场景
- 分布式协调组件 -  服务组件的数据一致性问题的协调,zk可以通知其他服务修改数据状态 
- 分布式锁 -  zk可以做到强一致性 
- 无状态化的实现 -  登陆组件(系统)冗余部署后,登录信息放在哪个组件都不行,因为下次自动均衡找到的不一定是上个组件,可以将登录状态放入zk 
搭建zk服务器
安装配置
- 官网下载zk工具 
- 打开进入 bin目录,可以看到zkServer.sh脚本,但是启动是需要zoo.cfg文件的,可以修改conf目录下的zoo_sample.cfg 
- 有了配置文件后 - ./bin/zkServer.sh start ../conf/zoo.cfg 启动即可
 - 
- zoo.cfg 文件说明 
 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 
 | # zk的时间单位(毫秒)tickTime=2000
 # 允许follower初始化连接到leader最大时长,这里是20s
 initLimit=10
 # 允许follower与leader数据同步最大时长
 syncLimit=5
 # zk数据存储目录以及日志保存目录(如果没有指明dataLogDir
 dataDir=/bitnami/zookeeper/data
 # 对客户端提供的端口号
 clientPort=2181
 # 单个客户端和zk的最大并发链接数
 maxClientCnxns=60
 
 # 自动清楚任务的时间间隔,单位为小时,0表示不自动清楚
 autopurge.purgeInterval=0
 
 #
 #
 
 #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
 #metricsProvider.httpPort=7000
 #metricsProvider.exportJvmInfo=true
 preAllocSize=65536
 snapCount=100000
 maxCnxns=0
 reconfigEnabled=false
 quorumListenOnAllIPs=false
 4lw.commands.whitelist=srvr, mntr
 maxSessionTimeout=40000
 admin.serverPort=8080
 admin.enableServer=true
 
 | 
zk的基本操作
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | zkServer.sh start ../conf/zoo.cfg
 zkServer.sh stop ../conf/zoo.cfg
 
 zkServer.sh status
 
 # 客户端执行命令脚本,相当于命令行了(zk服务器启动情况下可以通过此脚本执行命令)
 zkCli.sh
 
 # 以下为进入命令行后的命令
 ls / #查看zk的数据
 help # 查看帮助
 create /test1 #创建znode节点
 create /test2 abc #将数据放入节点
 create -s /test3 #持久序号节点
 create -e /test4 #临时节点
 create -c /container # 容器节点
 set /test4 aa #设置内容
 delete /test1 # 删除
 get /test2
 get -s /test2 # 查看znode的stat信息
 ls -R /test1 #递归查询
 deleteall /test1 #删除节点和其子节点
 delete /test3 # 普通删除
 delete -v 1 /test4 #删除数据版本为1的节点(每次set版本都增加了1,这里用了乐观锁,可以删不成功继续+1删除)
 
 
 | 

zk内部数据模型
非常类似linux的内部数据结构,每次create都是创建一个node节点,可以给这个node节点内存入数据
znode内部结构包含四个部分:
1. data:数据
2. acl:权限
    + c:create的权限
    + w:写
    + r:读
    + d:删除
    + a:管理
3. stat:描述当前znode的元数据
4. child:当前节点的子节点
zk中节点znode的类型
+ 持久节点:会话结束后任然存在
+ 持久序号节点:每次创建的节点自增序号(并发严重情况下,给每个事务分配顺序)
+ 临时节点:会话结束后即可删除,通过这个实现服务注册和发现(客户端和服务器建立的链接就是一个会话,建立链接时,zk服务器会给zk客户端发送一个session id,每次客户端通信时,zk服务器就会自动续约session id,如果超时自动删除session id这个id可以通过get -s 查看到)
+ Container节点:容器节点,如果容器内没有任何子节点,该节点会被定期删除
+ TTL节点:自定义节点到期时间
zk的数据持久化
- 事务日志:每个执行的命令以日志的形式保存在dataLogDir中
- 数据快照:每隔一定时间把内存的数据备份一次,存在dataDir中
之后恢复可以先回复快照文件数据到内存中,再通过日志文件的数据做增量回复,这样的恢复速度很快
权限设置
Curator客户端
介绍
- 这是网飞开发的专为zk的客户端框架,是对zk支持最好的工具,支持Leader选举,分布式锁等,减少开发者使用zk的底层细节。
 引入依赖
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | <dependencies><dependency>
 <groupId>org.apache.zookeeper</groupId>
 <artifactId>zookeeper</artifactId>
 <version>3.4.14</version>
 </dependency>
 <dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>2.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>2.12.0</version>
 </dependency>
 </dependencies>
 
 | 
application.yml配置文件
| 12
 3
 4
 5
 
 | curator.retryCount=5curator.elapsedTimesMs=5000
 curator.connectString=127.0.0.1:2181
 curator.sessionTimeoutMs=60000
 curator.connectionTimeoutMs=5001
 
 | 
配置文件WarpperZK.class
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | package com.zq.client.config;
 import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
 @Data
 @Component
 @ConfigurationProperties(prefix = "curator")
 public class WrapperZK {
 private int retryCount;
 private int elapsedTimesMs;
 private String connectString;
 private int sessionTimeoutMs;
 private int connectionTimeoutMs;
 }
 
 
 | 
CuratorConfig.class 配置文件
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | package com.zq.client.config;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 @Configuration
 public class CuratorConfig {
 @Autowired
 WrapperZK wrapperZk;
 
 @Bean(initMethod = "start")
 public CuratorFramework curatorFramework() {
 return CuratorFrameworkFactory.newClient(
 wrapperZk.getConnectString(), wrapperZk.getSessionTimeoutMs(), wrapperZk.getConnectionTimeoutMs(),
 new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimesMs()));
 }
 }
 
 
 | 
测试类文件
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 
 | package com.zq.client;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import java.nio.charset.StandardCharsets;
 
 
 
 
 
 
 
 
 @Slf4j
 @SpringBootTest
 @RunWith(SpringRunner.class)
 public class BootZkClientApplicationTest {
 
 @Autowired
 CuratorFramework curatorFramework;
 
 @Before
 public void before() throws Exception {
 }
 
 @After
 public void after() throws Exception {
 }
 
 
 
 
 @Test
 public void createNode() throws Exception {
 
 String path = curatorFramework.create().forPath("/curator-node");
 
 
 System.out.println(String.format("curator create node:%s successfully.", path));
 
 }
 
 @Test
 public void testGetData() throws Exception {
 byte[] bytes = curatorFramework.getData().forPath("/curator-node");
 System.out.println(new String(bytes));
 }
 
 @Test
 public void testSetData() throws Exception {
 curatorFramework.setData().forPath("/curator-node", "changed".getBytes(StandardCharsets.UTF_8));
 }
 
 @Test
 public void testCreateWithParent() throws Exception {
 curatorFramework.create().creatingParentsIfNeeded().forPath("/parent-node/sub", "tzq".getBytes(StandardCharsets.UTF_8));
 byte[] bytes = curatorFramework.getData().forPath("/parent-node/sub");
 System.out.println(new String(bytes));
 }
 }
 
 
 | 
ZK如何实现分布式锁
如果两个java服务都是向数据库中写入车票数据,负载均衡每次只分配到一个java服务,当一个服务写时,另一个服务不能写,如何做到两个或多个服务的锁,通过ZK存储锁
zk中锁的种类:
ZK上读锁
- 创建临时序号节点,节点数据为read,表示读锁
- 获取zk中序号比自己小的所有节点,判断是不是读锁,若都是读锁,那么上锁成功,反之失败,为最小节点设置监听,阻塞等待,当小于最小节点时同时当前节点再判断
ZK上写锁
- 创建临时序号节点,节点数据为write,表示写锁
- 获取所有节点并判断自己是否时最小节点,是则上锁成功,反之监听最小节点,最小节点没了才能再次检测
羊群效应
如果100个并发来上写锁,那么99个并发会监听写锁,当第一个节点完成写锁消失后,99个又触发监听事件,对zk压力大,调整为链式监听即可,即不再监听第一个节点,而是监听上一个节点。
这样,100个并发过来,第一个得到写锁,第二个监听第一个,第三个监听第二个,依次。如果第一个结束,第二个监听事件触发可以上写锁,但是第三位不被触发监听因为第二位节点没有删除。
zk的watch机制
watch机制类似触发器,当znode改变,即调用了create,delete,setData等方法的时候,会触发对应znode上注册的事件,请求watch的客户端会接收到异步通知
cli中创建并且监听节点
| 12
 3
 4
 5
 
 | craete /test9get -w /test9 # 一次性监听节点内容
 ls -w /test9 # 一次性监听节点目录(一层目录)
 ls -R -w /test9 # 监听所有子目录变化
 # 这时候其他会话修改数据,这里会通知
 
 | 
 
 
curator 客户端使用watch监听节点
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | @Testpublic void testAddListener() throws Exception {
 NodeCache nodeCache = new NodeCache(curatorFramework, "/curator-node");
 nodeCache.getListenable().addListener(() -> {
 log.info("{} path nodeChanged:", "/curator-node");
 printNodeData();
 });
 nodeCache.start();
 System.in.read();
 }
 
 public void printNodeData() throws Exception {
 byte[] bytes = curatorFramework.getData().forPath("/curator-node");
 log.info("data:{}", new String(bytes));
 }
 
 | 
Curator 上写锁和读锁
这段junit代码可以先运行读锁再运行写锁看效果
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 
 | @Test
 public void testGetReadLock() throws Exception {
 
 InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/lock1");
 
 InterProcessMutex interProcessMutex = interProcessReadWriteLock.readLock();
 System.out.println("等待获取读锁对象");
 
 interProcessMutex.acquire();
 for (int i = 0; i < 10; i++) {
 Thread.sleep(3000);
 System.out.println(i);
 }
 
 interProcessMutex.release();
 System.out.println("等待释放锁");
 
 }
 
 @Test
 public void testGetWriteLock() throws Exception {
 
 InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/lock1");
 
 InterProcessMutex interProcessMutex = interProcessReadWriteLock.writeLock();
 System.out.println("等待获取写锁对象");
 
 interProcessMutex.acquire();
 for (int i = 0; i < 10; i++) {
 Thread.sleep(3000);
 System.out.println(i);
 }
 
 interProcessMutex.release();
 System.out.println("等待释放锁");
 
 }
 
 
 | 
ZK集群
集群的角色
- Leader : 处理集群的所有事务请求,只有一个,负责数据读写
- Follower : 从,只负责数据读,还能参与Leader选举(Leader挂了的情况)
- Observer : 观察者,只负责读,不参与选举
集群搭建
我这里用了docker搭建,没有用dockerCompose,而是bash脚本执行,注意!
在windows下使用gitbash执行shell脚本,路径是要这么写的!
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 
 | #!/bin/bash
 # 创建配置文件
 for port in $(seq 1 5);
 do
 mkdir -p /D/DockerV/zkCluster/zkData-${port}/data
 touch /D/DockerV/zkCluster/zkData-${port}/data/myid
 cat << EOF > /D/DockerV/zkCluster/zkData-${port}/data/myid
 ${port}
 EOF
 
 touch /D/DockerV/zkCluster/zkData-${port}/zoo.cfg
 cat << EOF > /D/DockerV/zkCluster/zkData-${port}/zoo.cfg
 tickTime=2000
 initLimit=10
 syncLimit=5
 dataDir=/bitnami/zookeeper/data
 clientPort=2181
 server.1=172.38.0.11:2001:3001
 server.2=172.38.0.12:2001:3001
 server.3=172.38.0.13:2001:3001
 server.4=172.38.0.14:2001:3001
 server.5=172.38.0.15:2001:3001:observer
 EOF
 done
 
 # 批量启动容器
 for port in $(seq 1 5);
 do
 docker rm -f  zk-${port}
 docker run -p 318${port}:2181 \
 --name zk-${port} \
 -v D:/DockerV/zkCluster/zkData-${port}/data/://bitnami/zookeeper/data \
 -v D:/DockerV/zkCluster/zkData-${port}/zoo.cfg://opt/bitnami/zookeeper/conf/zoo.cfg -d \
 -e  ALLOW_ANONYMOUS_LOGIN=yes \
 --net zknet \
 --ip 172.38.0.1${port} \
 bitnami/zookeeper
 done
 
 
 | 
进入集群 docker run -it --rm --network zknet bitnami/zookeeper zkCli.sh -server 192.168.10.28:3181,192.168.10.28:3182,192.168.10.28:3183,192.168.10.28:3184,192.168.10.28:3185


集群实质配置
- 其实本质是每台电脑配置好自己的/bitnami/zookeeper/data目录下的myid(里面是id号)
- 配置/opt/bitnami/zookeeper/conf/zoo.cfg这个cfg文件,最重要的是里面填好通信的其他机器地址和端口
ZAB协议
解决zk的崩溃恢复和主从数据同步问题
ZAB协议定义的四种节点状态
- Looking:选举状态
- Following:Follower节点所处的状态
- Leading:Leader节点的状态
- Observing:。。。
集群上线时Leader选举过程:
启动第一台时会looking状态寻找,直到第二台上线,开始选举,选票(myid,zXid事务id,每次事务都自增)第一轮投票,都生成自己的一张选票,然后把选票信息给对方,每台会比较选票(选择zXid最大的),如果事务id相同,取myid最大的,然后投入投票箱。第二轮互换自己投票箱的票,比较后放入投票箱,选出leader
崩溃恢复时的leader选举
Leader建立完毕后,Leader和Follower是有通信端口的,Leader和Follower存在socket链接,Follower会不间断的读socket数据,Leader会不间断的发,若Leader挂了,socket断开,Follower读不到数据后进入looking状态,其他节点同理。此时Leader出来之前不提供服务!
主从数据同步
客户端向集群写入数据,可能链接的是从,也可能是主,若客户端链接从节点,从节点会将数据发送给主节点。
主节点将数据写入自己的数据文件,并且返回ACK,然后同步的将数据发给Follower(广播),每个从节点也写入自己的数据文件,完成后返回ack给主节点,Leader收到半数以上的Ack信号后,这时Leader给所有从节点提交commit。从节点收到后将数据写入内存。这样大部分服务器数据都同步了。
半数以上是为了提高集群写数据的性能。但是有可能会有某些服务器数据没有写入到内存。
NIO和BIO应用
NIO (多路复用)
- 用于被客户端链接的2181端口
 假设同时有4个客户端链接,有读有写,为了性能会使用NIO把所有请求放入一个队列,zk内部处理这些请求
- 客户端开启watch时也用
 若有客户端同时监听很多znode节点,当节点变化,客户端会被通知到,NIO模式实现非阻塞状态
BIO 传统阻塞模型
CAP定理
一个系统最多同时满足:一致性(每个节点数据一致),可用性(服务一直可用),分区容错性(遇到节点故障时仍然能够提供服务) 中的两项。
所以现在大部分都采用AP或者AC
BASE理论
- 基本可用  (双十一切掉评论,退款,注册服务,保留核心功能)
- 软状态        (没有其他功能的中间状态)
- 最终一致性 (最终还是能退款的)
回到之前的弊端(zk可能存在某些节点数据未同步),对于未同步数据的服务器,它的事务节点肯定落后于有数据的节点(处理完一次数据,事务id自增),因此zk最求的是顺序一致性,等网络恢复后,落后的节点总会同步到最新的数据,把事务id同步到最新。