欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

zookeeper(7)——zk实现分布式锁

时间:2023-05-03

需要锁的原因:

多任务去抢一个资源多任务都需要对资源进行写操作多任务对资源的访问是互斥的

1、不使用锁在多线程的情况下会出现什么情况

定义一个生成订单ID类,也就是多任务去争抢的资源

public class OrderNumGenerator { //全局订单id public static int count = 0; //生成订单ID public String getNumber() { SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); return simpt.format(new Date()) + "-" + ++count; }}

使用并发工具类CountDownLatch,启动500个线程去调用同一个资源

public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); //发令枪,模拟500个并发 private static CountDownLatch countDownLatch = new CountDownLatch(500); private static List result = new Vector(); public void run() { try { countDownLatch.await(); result.add(orderNumGenerator.getNumber()); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { System.out.println("####生成唯一订单号###"); for (int i = 0; i < 500; i++) { new Thread(new OrderService()).start(); countDownLatch.countDown(); } countDownLatch.await(); Thread.sleep(1000); Collections.sort(result); for(String str:result) { System.out.println(str); } }}

结果发现,出现重复的订单ID

 通常,我们通常使用synchronized或者Lock的方式加锁,来解决这个问题

2、synchronized

直接在资源,也就是生成订单ID的代码中加上一把锁

public class OrderNumGenerator { //全局订单id public static int count = 0; public static Object lock = new Object(); public String getNumber() { synchronized(lock){ SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); return simpt.format(new Date()) + "-" + ++count; } }}

3、Lock方式

同样在资源上加锁,即可解决问题

public class OrderNumGenerator { //全局订单id public static int count = 0; private java.util.concurrent.locks.Lock lock = new ReentrantLock(); //以lock的方式解决 public String getNumber() { try { lock.lock(); SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss"); String s = simpt.format(new Date()) + "-" + ++count; return s; }finally { lock.unlock(); } }}

但是这两种方式,无法解决分布式环境中的并发问题,在了解分布式锁之前, 先了解下模板方法

即:在父类中定义主流程,主流程中某些方法可以延迟到子类去实现

4、模板方法

父类

public abstract class FatherTemplate {public void A() {System.out.println("A");}public abstract void B() ;public void C() {System.out.println("C");}public void D() {A();B();C();}}

子类

public class SonTemplate extends FatherTemplate{@Overridepublic void B() {System.out.println("B");return;}public static void main(String[] args) {FatherTemplate sonTemplate = new SonTemplate();sonTemplate.D();}}

这样每一个子类,都可以定义自己的B方法,在实际的调用过程中,调用自己的B方法

5、zookeeper实现分布式锁

定义一个接口

//接口public interface Lock { //获取到锁的资源 public void getLock(); // 释放锁 public void unLock();}

定义一个模板

public abstract class AbstractLock implements Lock{ public void getLock() { if (tryLock()) { System.out.println("##获取锁####"); } else { waitLock(); getLock(); } } public abstract boolean tryLock(); public abstract void waitLock();}

接下来使用zookeeper实现tryLock()、waitLock()、unLock()

tryLock():使用zookeeper创建一个临时节点,如果成功,代表获取到锁

waitLock():使用zookeeper注册一个该临时节点的监听,并且使用到并发工具类,如果该节点已存在,则等待;当监听到它被删除时,继续执行

unLock():删除节点

public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock { private CountDownLatch countDownLatch = null; @Override //尝试获得锁 public boolean tryLock() { try { zkClient.createEphemeral(PATH); return true; } catch (Exception e) { //如果创建失败报出异常 return false; } } @Override public void waitLock() { IZkDataListener izkDataListener = new IZkDataListener() { public void handleDataDeleted(String path) throws Exception { // 唤醒被等待的线程 if (countDownLatch != null) { countDownLatch.countDown(); } } public void handleDataChange(String path, Object data) throws Exception { } }; // 注册事件 zkClient.subscribeDataChanges(PATH, izkDataListener); //如果节点存 if (zkClient.exists(PATH)) { countDownLatch = new CountDownLatch(1); try { //等待,一直等到接受到事件通知 countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } // 删除监听 zkClient.unsubscribeDataChanges(PATH, izkDataListener); } public void unLock() { //释放锁 if (zkClient != null) { zkClient.delete(PATH); zkClient.close(); System.out.println("释放锁资源..."); } }}

这样就可以使用我们自己实现的zookeeper分布式锁

private Lock lock = new ZookeeperDistrbuteLock();public void getNumber() { try { lock.getLock(); //业务操作 } catch (Exception e) { e.printStackTrace(); } finally { lock.unLock(); } }

6、zookeeper分布式锁优化

上述方法存在羊群效应,即一个线程拿到锁,其他线程都在等待,当释放锁的时候,其他所有的线程都会去抢,所以需要进行优化

tryLock():使用zookeeper创建一个临时有序节点,并且当前节点在所有的临时有序节点中排第一,才算成功获取到锁;如果不是第一,则寻找它前面节点

waitLock():使用zookeeper注册一个前面节点的监听,当监听到前面节点被删除时,则继续执行

unLock():删除节点

这样根据多任务的请求顺序,创建多个锁,每一个都监听前一个锁是否被删除,如果监听到删除则继续执行,去尝试拿锁。就不会出现羊群效应了

public class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock { private CountDownLatch countDownLatch= null; private String beforePath;//当前请求的节点前一个节点 private String currentPath;//当前请求的节点 public ZookeeperDistrbuteLock2() { if (!this.zkClient.exists(PATH2)) { this.zkClient.createPersistent(PATH2); } } @Override public boolean tryLock() { //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath if(currentPath == null || currentPath.length()<= 0){ //创建一个临时顺序节点 currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock"); } //获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400 List childrens = this.zkClient.getChildren(PATH2); Collections.sort(childrens); if (currentPath.equals(PATH2 + '/'+childrens.get(0))) {//如果当前节点在所有节点中排名第一则获取锁成功 return true; } else {//如果当前节点在所有节点中排名中不是排名第一,则获取前面的节点名称,并赋值给beforePath int wz = Collections.binarySearch(childrens, currentPath.substring(7)); beforePath = PATH2 + '/'+childrens.get(wz-1); } return false; } @Override public void waitLock() { IZkDataListener listener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { if(countDownLatch!=null){ countDownLatch.countDown(); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; //给排在前面的的节点增加数据删除的watcher,本质是启动另外一个线程去监听前置节点 this.zkClient.subscribeDataChanges(beforePath, listener); if(this.zkClient.exists(beforePath)){ countDownLatch=new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } this.zkClient.unsubscribeDataChanges(beforePath, listener); } public void unLock() { //删除当前临时节点 zkClient.delete(currentPath); zkClient.close(); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。