欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

curator框架分布式锁解析

时间:2023-04-30
1、InterProcessMutex

分析:

可重入的互斥锁,跨JVM工作。使用ZooKeeper来控制锁。所有JVM中的任何进程,只要使用同样的锁路径,将会成为跨进程的一部分。此外,这个排他锁是“公平的”,每个用户按照申请的顺序得到排他锁。可见InterProcessMutex和我们自己实现的例子都是一个排他锁,此外还可以重入。

代码:

    

public class Curator_Session { static AtomicInteger n = new AtomicInteger(); static String lockPath = "/curator_recipe_lock_path"; static Curatorframework client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); final InterProcessMutex lock = new InterProcessMutex(client, lockPath); for (int i = 0; i < 30; i++) { System.out.println("----------------"+n.addAndGet(1)+"------------------"); new Thread(new Runnable() { @Override public void run() { try { lock.acquire(); System.out.println("--------------"); } catch (Exception e) { } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("---------------生成的订单号:" + orderNo); try { lock.release(); } catch (Exception exception) { } } }).start(); } }}

   运行结果:

   

---------------生成的订单号:15:46:04|460---------------生成的订单号:15:46:04|474---------------生成的订单号:15:46:04|477---------------生成的订单号:15:46:04|489

     逻辑分析:首先通过acquire()获取锁,该方法会阻塞进程,直到获取锁,然后执行你的业务方法,最后通过 release()释放锁。

    思路:1、创建有序临时节点

                2、触发“尝试取锁逻辑”,如果自己是临时锁节点序列的第一个,则取得锁,获取锁成功。

                3、如果自己不是序列中第一个,则监听前一个锁节点变更。同时阻塞线程。

                4、当前一个锁节点变更时,通过watcher恢复线程,然后再次到步骤2“尝试取锁逻辑”

   源码解析:

             InterProcessMutex实现了两个接口:

            public class InterProcessMutex implements InterProcessLock, Revocable

           InterProcessLock是分布式锁接口,分布式锁必须实现接口中的如下方法:

          1、获取锁,直到锁可用

             public void acquire() throws Exception;

         2、在指定等待的时间内获取锁。

            public boolean acquire(long time, TimeUnit unit) throws Exception;

         3、释放锁

           public void release() throws Exception;

        4、当前线程是否获取了锁

       boolean isAcquiredInThisProcess();

      获取锁:

   

public void acquire() throws Exception{ if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); }}private boolean internalLock(long time, TimeUnit unit) throws Exception{ 获取当前线程 Thread currentThread = Thread.currentThread(); 取得当前线程在threadData中的lockData LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering 如果存在该线程的锁数据,说明是锁重入, lockData.lockCount加1,直接返回true。获取锁成功 lockData.lockCount.incrementAndGet(); return true; } 如果不存在该线程的锁数据,则通过internals.attemptLock()获取锁,此时线程被阻塞,直至获得到锁 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { 锁获取成功后,把锁的信息保存到threadData中。 LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } 如果没能获取到锁,则返回false。 return false;}

   LockInternals源码分析:Curator通过zk实现分布式锁的核心逻辑都在LockInternals中

   在InterProcessMutex获取锁的代码分析中,可以看到它是通过internals.attemptLock(time, unit, getLockNodeBytes());来获取锁的,那么我们就以这个方法为入口。

   

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { 通过driver在zk上创建锁节点,获得锁节点路径。 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); 通过internalLockLoop()方法阻塞进程,直到获取锁成功 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NonodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc、So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } internalLockLoop中通过while自旋,判断锁如果没有被获取,将不断的去尝试获取锁。 while ( (client.getState() == CuratorframeworkState.STARTED) && !haveTheLock ) { List children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash 通过driver查看当前锁节点序号是否排在第一位,如果排在第一位,说明取锁成功,跳出循环 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { 如果没有排在第一位,则监听自己的前序锁节点,然后阻塞线程。 当前序节点释放了锁,监听会被触发,恢复线程,此时主线程又回到while中第一步。 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NonodeException e ) { // it has been deleted (i.e、lock released)、Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; } if ( hasTheLock ) { return ourPath; } return null; }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。