欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Redis实现分布式锁,非lua脚本

时间:2023-06-20
Redis实现分布式锁,非lua脚本 初始化Spring StringRedisTemplate

LettuceConnectionFactory factory = new LettuceConnectionFactory();factory.setHostName("localhost");factory.setPort(6379);factory.afterPropertiesSet();StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(factory);stringRedisTemplate.afterPropertiesSet();//清空上次测试数据stringRedisTemplate.delete("lock:uid1");//锁idString lockId = UUID.randomUUID().toString();

加锁

stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize("lock:uid1"); byte[] value = valueSerializer.serialize(lockId); Long ttl = connection.ttl(key, TimeUnit.SECONDS); if (-1 == ttl) { //清理异常超时 connection.del(key); } //setNX将key的值设为value,当且仅当key不存在。 //若给定的key已经存在,则SETNX不做任何动作。 Boolean ret = connection.setNX(key, value); if (ret) { //setNX成功,设置锁的超时时间3秒 connection.expire(key, 3); } return null; }});

解锁

stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize("lock:uid1"); byte[] value = connection.get(key); String lid = valueSerializer.deserialize(value); if (!lockId.equals(lid)) { //lockId不一样,锁被替换。做业务回滚处理 throw new RuntimeException("锁超时"); } //如果在事务执行之前这key被其他命令所改动,那么下面事务将被打断 connection.watch(key); try { //标记一个事务块的开始 connection.multi(); //删除key connection.del(key); //执行所有事务块内的命令 List ret = connection.exec(); if (null == ret) { //watch后被其他命令所改动 throw new RuntimeException("锁超时"); } } finally { //取消WATCH命令对key的监视 connection.unwatch(); } return null; }});

模拟解锁时,持有锁的时间已经超时

stringRedisTemplate.delete("lock:uid1");String lockId = UUID.randomUUID().toString();stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize("lock:uid1"); byte[] value = valueSerializer.serialize(lockId); Long ttl = connection.ttl(key, TimeUnit.SECONDS); if (-1 == ttl) { //清理异常超时 connection.del(key); } //setNX将key的值设为value,当且仅当key不存在。 //若给定的key已经存在,则SETNX不做任何动作。 Boolean ret = connection.setNX(key, value); if (ret) { //setNX成功,设置锁的超时时间3秒 connection.expire(key, 3); } return null; }});stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize("lock:uid1"); byte[] value = connection.get(key); String lid = valueSerializer.deserialize(value); if (!lockId.equals(lid)) { //lockId不一样,锁被替换。做业务回滚处理 throw new RuntimeException("锁超时"); } //如果在事务执行之前这key被其他命令所改动,那么下面事务将被打断 connection.watch(key); //模拟等待锁3秒后超时 try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } //模拟其他人拿到锁 stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize("lock:uid1"); Boolean ret = connection.setNX(key, value); if (ret) { //设置锁的超时时间 connection.expire(key, 3); } return null; } }); try { //标记一个事务块的开始 connection.multi(); //删除key connection.del(key); //执行所有事务块内的命令 List ret = connection.exec(); if (null == ret) { //watch后被其他命令所改动 throw new RuntimeException("锁超时"); } } finally { //取消WATCH命令对key的监视 connection.unwatch(); } return null; }});

完整封装正式可用

1.增加SUBSCRIBE订阅锁释放
2.去掉expire,在value里存放过期时间(使用expire,存在watch失效问题)

public static class RedisLock { private StringRedisTemplate stringRedisTemplate; public RedisLock(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public RLock newLock(String redisKey) { return new RLock(stringRedisTemplate, redisKey, UUID.randomUUID().toString()); } private static class LockStatus { private boolean lock; private long ttl; public boolean isLock() { return lock; } public long getTtl() { return ttl; } public LockStatus(boolean lock, long ttl) { this.lock = lock; this.ttl = ttl; } } public static class RLock { private static String UNLOCK = "unlock"; private StringRedisTemplate stringRedisTemplate; private String redisKey; private String lockId; public RLock(StringRedisTemplate stringRedisTemplate, String redisKey, String lockId) { this.stringRedisTemplate = stringRedisTemplate; this.redisKey = redisKey; this.lockId = lockId; } public void lock(long lockTimeout, TimeUnit unit) { stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { CompletableFuture future = new CompletableFuture(); RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); byte[] key = keySerializer.serialize(redisKey); //订阅锁释放消息 connection.subscribe(new MessageListener() { @Override public void onMessage(Message message, byte[] pattern) { String msg = new String(message.getBody()); if (UNLOCK.equals(msg)) { future.complete(message); } } }, key); //tryLock LockStatus lockStatus; do { lockStatus = _tryLock(lockTimeout, unit); if (!lockStatus.isLock()) { try { future.get(lockStatus.getTtl(), TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw new RuntimeException(e.getMessage(), e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (TimeoutException e) { } } } while (!lockStatus.isLock()); return null; } }); } public boolean tryLock(long lockTimeout, TimeUnit unit) { LockStatus lockStatus = _tryLock(lockTimeout, unit); return lockStatus.isLock(); } public LockStatus _tryLock(long lockTimeout, TimeUnit unit) { LockStatus lock = stringRedisTemplate.execute(new RedisCallback() { @Override public LockStatus doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize(redisKey); long ttl = TimeoutUtils.toMillis(lockTimeout, unit); long expireTime = System.currentTimeMillis() + ttl; byte[] value = valueSerializer.serialize(String.format("%s:%s", lockId, expireTime)); String data; do { byte[] bytes = connection.get(key); data = valueSerializer.deserialize(bytes); if (null == data) { //setNX将key的值设为value,当且仅当key不存在。 //若给定的key已经存在,则SETNX不做任何动作。 Boolean ret = connection.setNX(key, value); if (!ret) { continue; } //获取锁成功 return new LockStatus(true, ttl); } } while (null == data); return new LockStatus(false, getTTL(data)); } }); return lock; } private long getTTL(String data) { String[] split = data.split(":"); Long expireTime = Long.parseLong(split[1]); long ttl = expireTime - System.currentTimeMillis(); return ttl; } private String getLockId(String data) { String[] split = data.split(":"); String lockId = split[0]; return lockId; } public void unlock() { stringRedisTemplate.execute(new RedisCallback() { @Override public Void doInRedis(RedisConnection connection) throws DataAccessException { RedisSerializer keySerializer = (RedisSerializer) stringRedisTemplate.getKeySerializer(); RedisSerializer valueSerializer = (RedisSerializer) stringRedisTemplate.getValueSerializer(); byte[] key = keySerializer.serialize(redisKey); byte[] value = connection.get(key); String data = valueSerializer.deserialize(value); if (null == data) { //ttl超时。做业务回滚处理 throw new RuntimeException("锁超时"); } if (getTTL(data) <= 0) { //ttl超时。做业务回滚处理 throw new RuntimeException("锁超时"); } if (!lockId.equals(getLockId(data))) { //lockId不一样,锁被替换。做业务回滚处理 throw new RuntimeException("锁超时"); } //如果在事务执行之前这key被其他命令所改动,那么下面事务将被打断 connection.watch(key); try { //标记一个事务块的开始 connection.multi(); //删除key connection.del(key); //执行所有事务块内的命令 List ret = connection.exec(); if (null == ret) { //watch后被其他命令所改动 throw new RuntimeException("锁超时"); } byte[] msg = valueSerializer.serialize(UNLOCK); //发布锁释放消息 connection.publish(key, msg); } finally { //取消WATCH命令对key的监视 connection.unwatch(); } return null; } }); } } }

测试

stringRedisTemplate.delete("lock:test"); RedisLock redisLock = new RedisLock(stringRedisTemplate); RedisLock.RLock rLock1 = redisLock.newLock("lock:test"); rLock1.lock(3 * 1000, TimeUnit.MILLISECONDS); rLock1.unlock();

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。