目录
一、概述
1、简介
2、工作机制
3、特点
4、数据结构
5、应用场景
二、下载与安装
1、本地模式下载安装
2、配置参数(图见上图)
三、集群
1、集群安装
1)集群规划(多台虚拟机模拟)
2)解压安装
3)启动集群
2、选举机制(面试重点)
1)第一次启动
2)第二次启动
3、启动停止脚本
4、客户端命令行操作
1)命令行语法
2)参数信息
5、监听器
6、客户端API操作
1)IDEA环境搭建
2)创建客户端并连接zookeeper
3)创建子节点(必须先执行上一步初始化方法)
4)监听节点变化( 3)有设置监听器)
5)判断节点是否存在
7、客户端向服务端写数据流程
四、服务器动态上下线监听案例
1、需求分析
2、具体实现
1)创建节点
2)客户端连接集群,创建对应路径并写上名称
3)客户端集群上监听操作
五、分布式锁
1、案例分析
2、分布式锁实现
3、测试
4、Curator框架实现分布式锁
1)原生JavaAPI开发存在问题
2)Curator实操
六、面试真题
1、选举 机制
1)第一次启动选举规则
2)第二次启动选举规则
2、生产 集群 安装多少 zk合适
3、常用命令
一、概述 1、简介
Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。
2、工作机制
是基于观察者模式设计的分布式服务管理框架,负责储存和管理大家都关心的数据,然后接收观察者的注册,一旦数据状态变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。
Zookeeper = 文件系统 + 通知机制
3、特点
① Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
② 集群只要有半数以上节点存活,Zookeeper集群就能正常服务。适合安装奇数台服务器。
③ 全局数据一致每个Server保存一份相同的数据副本。
④ 先进先出的请求顺序执行。
⑤ 数据更新原子性。
⑥ 实行性。同步时间快。
4、数据结构
zookeeper数据模型结构与Unix文件系统很相似,树形结构,每个节点为ZNode,每个ZNode默认存储1MB数据。
5、应用场景
1)统一命名管理:对应用/服务进行统一命名,便于识别。
2)统一配置管理:所有节点的配置信息是一致的。配置文件修改后,能快速同步到各个节点上。
3)统一集群管理:实时监控节点变化。
4)服务器动态上下线:客户端能实时洞察到服务器上下线的变化。
5)软负载均衡:Zookeeper中会记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端需求。
二、下载与安装 1、本地模式下载安装
下载地址:Apache ZooKeeper -> Download
必须在linux中安装java jdk
[root@ppitm-c ~]# java -versionopenjdk version "1.8.0_275"OpenJDK Runtime Environment (build 1.8.0_275-b01)OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)[root@ppitm-c ~]#
下zookeeper压缩包到系统
[root@ppitm-c ~]# cd /opt[root@ppitm-c opt]# ll总用量 534788-rw-r--r--、 1 root root 9311744 2月 2 23:06 apache-zookeeper-3.5.7-bin.tar.gz
解压:
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/
服务端启动脚本:zkServer.sh
客户端启动脚本:zkCli.sh
[root@ppitm-c bin]# pwd/opt/apache-zookeeper-3.6.3-bin/bin[root@ppitm-c bin]# ll总用量 64-rwxr-xr-x、1 1000 1000 232 4月 9 2021 README.txt-rwxr-xr-x、1 1000 1000 2066 4月 9 2021 zkCleanup.sh-rwxr-xr-x、1 1000 1000 1158 4月 9 2021 zkCli.cmd-rwxr-xr-x、1 1000 1000 1620 4月 9 2021 zkCli.sh-rwxr-xr-x、1 1000 1000 1843 4月 9 2021 zkEnv.cmd-rwxr-xr-x、1 1000 1000 3690 4月 9 2021 zkEnv.sh-rwxr-xr-x、1 1000 1000 1286 4月 9 2021 zkServer.cmd-rwxr-xr-x、1 1000 1000 4559 4月 9 2021 zkServer-initialize.sh-rwxr-xr-x、1 1000 1000 11332 4月 9 2021 zkServer.sh-rwxr-xr-x、1 1000 1000 988 4月 9 2021 zkSnapShotToolkit.cmd-rwxr-xr-x、1 1000 1000 1377 4月 9 2021 zkSnapShotToolkit.sh-rwxr-xr-x、1 1000 1000 996 4月 9 2021 zkTxnLogToolkit.cmd-rwxr-xr-x、1 1000 1000 1385 4月 9 2021 zkTxnLogToolkit.sh[root@ppitm-c bin]#
修改配置:配置文件改为zoo.cfg
修改:dataDir=/opt/apache-zookeeper-3.6.3-bin/zkData
系统提供的只是临时地址,必须自己新建一个目录地址:mkdir zkData
启动服务端:bin/zkServer.sh start
启动客户端:bin/zkCli.sh
2、配置参数(图见上图)
1)tickTime=2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒。
2)initLimit=10:LF初始通信时限(10个心跳),初始连接能容忍的最多心跳数。
3)syncLimit=5:LF同步通信时限(5个心跳),Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
4)dataDir:保存Zookeeper中的数据
5)clientPort:2181:客户端连接端口
三、集群 1、集群安装 1)集群规划(多台虚拟机模拟)
1)集群规划(多台虚拟机模拟)
比如在hadoop102、hadoop103、hadoop104节点上部署zookeeper。
2)解压安装
解压压缩包(第二章下载的压缩包相同)到/opt目录
修改配置文件zoo.zrx,并修改储存地址
在 /opt/zookeeper-3.5.7/zkData目录下创建一个 myid的文件,并在文件添加相应编号。(比如102服务器就添加2,103服务器就添加3,每个服务器的身份标识都要标注)
zoo.cfg末尾增加配置:server.唯一标识=服务器地址:主从交换信息端口:选举通信端口
(比如:server.2=192.168.16.122:2888:3888)(每个服务端都配置)
########################cluster######################server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
3)启动集群
启动超过半数以上服务器将开始选举机制
2、选举机制(面试重点) 1)第一次启动 2)第二次启动 3、启动停止脚本
2)第二次启动 3、启动停止脚本
集群启动停止脚本 bin/zkServer.sh stop 或 用脚本(zk.sh stop)
#!/bin/bashcase $1 in"start"){for i in hadoop102 hadoop103 hadoop104doecho -------zookeeper $i 启动 -------ssh $1 "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"done};;"stop"){for i in hadoop102 hadoop103 hadoop104doecho -------zookeeper $i 停止 -------ssh $1 "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"done};;"status"){for i in hadoop102 hadoop103 hadoop104doecho -------zookeeper $i 状态 -------ssh $1 "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"done};;
4、客户端命令行操作 1)命令行语法 ls / 查看节点create 节点名 "数据"创建永久节点create -s 节点名 "数据" 创建带序号永久节点(可重复相同数据)create -e 节点名 "数据" 创建临时节点get -s /sanguo获取节点对应的值set -s /sanguo 修改节点对应的值2)参数信息
2)参数信息
[zk: hadoop102 :2181(CONNECTED) 5] ls s /[zookeeper]cZxid = 0x0ctime = Thu Jan 01 08:00:00 CST 1970mZxid = 0x0mtime = Thu Jan 01 08:00:00 CST 1970pZxid = 0x0cversion = 1dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 0numChildren = 1
czxid 创建节点的事务 zxid
每次修改ZooKeeper状态都会 产生一个 ZooKeeper事务 ID。事务 ID是 ZooKeeper中所
有修改总的次序。每 次 修改都有唯一的 zxid,如果 zxid1小于 zxid2,那么 zxid1在 zxid2之
前发生。
ctime znode被创建的毫秒数(从 1970年开始)
mzxid znode最后更新的事务 zxid
mtime znode最后修改的毫秒数(从 1970年开始)
pZxid znode最后更新的子节点 zxid
cversion:znode 子节点变化号,znode 子节点修改次数
dataversion:znode 数据变化号
aclVersion:znode 访问控制列表的变化号
ephemeralOwner:如果是临时节点,这个是znode 拥有者的session id。如果不是
临时节点则是0。
dataLength:znode 的数据长度
numChildren:znode 子节点数量
5、监听器 监听器及节点删除 get -w 节点名监控节点数据变化(注册一次生效一次)ls -w 节点名监控节点数变化(注册一次生效一次)delete 节点名删除节点deleteall 节点名删除节点及子节点stat 节点名查看节点状态(不看节点值)
注册一次监听一次
6、客户端API操作 1)IDEA环境搭建
导入依赖
创建log4j.properties到根目录(resources)
log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern =%d %p [%c]-%m%nlog4j.appender.logfile=org.apache.log4j.FileAppenderlog4j.appender.logfile.File=target/spring.loglog4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c]-%m%n
2)创建客户端并连接zookeeper
public class zkClient { private String connectString="192.168.177.129:2181";//连接zookeeper地址 private int sessionTimeout=100000;//超时时间(建议调长) private ZooKeeper zkClient; @Before public void init() throws IOException { //启动客户端并设置监听器 zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { List
3)创建子节点(必须先执行上一步初始化方法)
@Testpublic void create() throws KeeperException, InterruptedException { //创建节点(节点路径、内容、权限控制、什么样的节点) String nodeCreated = zkClient.create("/zhijia", "qyh.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
4)监听节点变化( 3)有设置监听器)
@Testpublic void getChildren() throws KeeperException, InterruptedException {// List
5)判断节点是否存在
@Testpublic void exist() throws KeeperException, InterruptedException { //监听某个节点是否存在 Stat stat = zkClient.exists("/zhijia", false); System.out.println(stat == null ? "not exit" : "exist");}
7、客户端向服务端写数据流程
四、服务器动态上下线监听案例 1、需求分析 2、具体实现 1)创建节点
2、具体实现 1)创建节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"Created /servers
2)客户端连接集群,创建对应路径并写上名称
package com.zhijia.case1;import org.apache.zookeeper.*;import java.io.IOException;public class DistributeServer { private String connectString="192.168.177.129:2181"; private int sessionTimeout=100000; private ZooKeeper zk; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { DistributeServer server = new DistributeServer(); //1、获取zk连接 server.getConnect(); //2、注册服务器到zk集群 server.regist(args[0]); //3、启动业务逻辑(睡觉) server.business(); } //启动业务逻辑(睡觉) private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } //注册服务器到zk集群 private void regist(String hostname) throws KeeperException, InterruptedException {//节点路径,主机名,权限,什么样的节点 String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" is online "); } //获取zk连接 private void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { } }); }}
3)客户端集群上监听操作
package com.zhijia.case1;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class DistributeClient { private String connectString="192.168.177.129:2181"; private int sessionTimeout=100000; private ZooKeeper zk; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { DistributeClient client = new DistributeClient(); //1、获取zk连接 client.getConnect(); //2、监听/servers下面子节点的增加和删除 client.getServerList(); //3、业务逻辑 client.business(); } //业务逻辑 private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } //获取zk连接 private void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { try { getServerList(); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); } //监听/servers下面子节点的增加和删除 public void getServerList() throws KeeperException, InterruptedException { List
五、分布式锁 1、案例分析 2、分布式锁实现
2、分布式锁实现
package com.zhijia.case2;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;public class DistributedLock { private final String connectString="192.168.177.129:2181"; private final int sessionTimeout=100000; private CountDownLatch countDownLatch=new CountDownLatch(1); private CountDownLatch waitLatch=new CountDownLatch(1); private final ZooKeeper zk; private String waitPath; private String currentMode; public DistributedLock() throws IOException, InterruptedException, KeeperException { //获取连接 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { //连接上,释放 if(watchedEvent.getState()==Event.KeeperState.SyncConnected){//监听状态为连接 countDownLatch.countDown(); } //waitLatch,释放 if(watchedEvent.getType()==Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){ waitLatch.countDown(); } } }); //等待zk正常连接后才往下走 countDownLatch.await(); //判断根节点/locks是否存在 Stat stat = zk.exists("/locks", false); if(stat==null){ //需要创建根节点 zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } //加锁 public void zkLock(){ //创建对应的临时带序号节点 try { currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //判断创建的节点是否是最小序号节点,若是就获取节点,若不是就监听前一个节点 List
3、测试
package com.zhijia.case2;import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributeLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { final DistributedLock lock1 = new DistributedLock(); final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { public void run() { try { lock1.zkLock(); System.out.println("线程1 启动:获取到锁"); Thread.sleep(10*1000); lock1.unZkLock(); System.out.println("线程1 :释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { public void run() { try { lock2.zkLock(); System.out.println("线程2 启动:获取到锁"); Thread.sleep(10*1000); lock2.unZkLock(); System.out.println("线程2 :释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }}
4、Curator框架实现分布式锁 1)原生JavaAPI开发存在问题
① 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
② Watch需要重复注册,不然就不能生效
③ 开发的复杂性还是比较高的
④ 不支持多节点删除和创建。需要自己去递归
2)Curator实操
官网:http://curator.apache.org/index.html
① 添加依赖
② 代码实现
package com.zhijia.case3;import org.apache.curator.framework.Curatorframework;import org.apache.curator.framework.CuratorframeworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest { public static void main(String[] args) { //创建分布式锁1 final InterProcessMutex lock1 = new InterProcessMutex(getCuratorframework(), "/locks"); //创建分布式锁2 final InterProcessMutex lock2 = new InterProcessMutex(getCuratorframework(), "/locks"); new Thread(new Runnable() { public void run() { //获取锁 try { lock1.acquire(); System.out.println("线程1:获取锁"); lock1.acquire(); System.out.println("线程1:再次获取锁"); Thread.sleep(5000); lock1.release(); System.out.println("线程1:释放锁"); lock1.release(); System.out.println("线程1:再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { public void run() { //获取锁 try { lock2.acquire(); System.out.println("线程2:获取锁"); lock2.acquire(); System.out.println("线程2:再次获取锁"); Thread.sleep(5000); lock2.release(); System.out.println("线程2:释放锁"); lock2.release(); System.out.println("线程2:再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } public static Curatorframework getCuratorframework() { //重试(多少秒重试,重试次数) ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3); //客户端(连接主机,超时时间,连接失败后间隔多少秒重试) Curatorframework client = CuratorframeworkFactory.builder().connectString("192.168.177.129:2181") .connectionTimeoutMs(10000) .retryPolicy(policy).build(); //启动客户端 client.start(); System.out.println("zookeeper启动成功"); return client; }}
六、面试真题 1、选举 机制
半数 机制 ,超过半数的投票通过,即通过。
1)第一次启动选举规则
投票过半数时,
服务器 id大的胜出
2)第二次启动选举规则
①
EPOCH大的直接胜出
②
EPOCH相同,事务 id大的胜出
③事务
id相同,服务器 id大的胜出
2、生产 集群 安装多少 zk合适
安装奇数台
生产经验:
⚫ 10台 服务器: 3台 zk
⚫ 20台 服务器: 5台 zk
⚫ 100台 服务器: 11台 zk
⚫ 200台 服务器: 11台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
3、常用命令
ls、 get、 create、 delete