分布式锁设计与实践 1、分布式锁是什么
分布式锁即在分布式环境下锁定共享资源,让请求处理串行化,实际表现为互斥锁。
分布式锁是主要是可重入的排它锁,主要具有如下特点:
1)释放锁的节点,一定是持有锁的节点。
2)持有锁的节点宕机了,不会因此而出现死锁,就像synchronized一样,当持有锁的线程异常退出了,锁会被自动释放。
3)排他性,很好理解,一个节点持有锁,其他节点就不能持有了。
4)可重入性,持有锁的节点,可以重复持有锁。
2、分布式锁应用场景
两个常见的应用场景:(1)防止用户重复下单 (2)mq消息去重
2.1、防止用户重复下单
正常情况下用户短时间内肯定是只会下单一次的,但是如果出现了超时用户点击了多次,那么如果前端的同学也没有控制住,这个时候如果服务层面不做任何控制就会出现同时下单多笔的情况,这个肯定是不正常的。当然如果在我们的服务没做水平扩容前我们加个本地锁就搞定了,但是如果我们的服务做了水平扩容:
这个时候就算我们的服务层面有本地锁,也还是不能解决问题的。比如第一次下单请求落在业务逻辑层1,而后面的重复下单落在业务逻辑层2、业务逻辑层3、业务逻辑层N,那我们的重复下单就真的重复下单了,所以这个时候我们需要分布式锁来解决这个问题:
这样每次下单请求过来的时候我们可以使用UserId作为关键信息先去分布式锁服务拿锁,如果能拿到请求继续执行,如果拿不到(被其他进程或线程占用)就丢弃掉当前请求。
2.2.MQ消息去重
还是用户下单的操作,这一次我们为了提升吞度量采用了水平分层异步架构,用户进行下单操作,这里我假设在网关层生成了订单号(正常情况下应该是专门的订单服务做处理),然后把请求丢到MessageQueue。这种场景下我们如果要对MQ消息去重,一般我们会有两种想法:
发送端去重
消费端去重
我们先来分析发送端去重,如果第一次发送端生成了消息丢到MQ,正常情况下MQ是会返回一个Ack给发送端的,那如果超时了,这个时候发送端重试又生成一个消息丢到MQ,我们可以思考下这种情况下我们能做到去重吗?肯定是做不到的吧,因为2次生成消息就会生成2个不同的MsgId。那我们再来看看消费端去重,其实这个时候我们要想既然MsgId我们不能唯一识别,那什么是我们能唯一识别的?肯定是订单号吧,那我们只要在消费端对订单号加锁就ok了吧。同样的如果没有做水平扩容的情况下我们本地锁是不是就可以搞定?但是水平扩容之后呢?
同样的道理,如果消息第一次被业务逻辑层1消费,后面重复的消息被业务逻辑层2、业务逻辑层3、业务逻辑层N消费,本地锁在这种情况下肯定搞不定。我们也需要分布式锁来做这个事:
这样每次消费消息的时候我们可以使用OrderId作为关键信息先去分布式锁服务拿锁,如果能拿到请求继续执行,如果拿不到(被其他进程或线程占用)就丢弃掉当前消息。
这里有一个需要注意的事情,我们的分布式锁只能处理短时间内的重复请求或者消息,如果过了锁的时间,那锁是无法处理的,这个时候怎么办呢?很简单,使用幂等处理。
通过分析上面2种业务场景,我们可以得出一个结论。这两种场景存在的共性是要处理共享的资源(订单、消息),那解决方案也就是实现共享资源的互斥,共享资源的串行化,那最后转换出来就是锁的问题了,锁又有本地锁和分布式锁。在分布式环境下,本地锁已经不奏效了,只能使用分布式锁。
3.分布式锁设计与实现 3.1、redis实现分布式锁
使用redis唯一工作线程来处理
3.1.1、利用 SETNX 和 SETEX
基本命令主要有:
· SETNX(SET If Not Exists):当且仅当 Key 不存在时,则可以设置,否则不做任何动作。
· SETEX:可以设置超时时间
其原理为:通过 SETNX 设置 Key-Value 来获得锁,随即进入死循环,每次循环判断,如果存在 Key 则继续循环,如果不存在 Key,则跳出循环,当前任务执行完成后,删除 Key 以释放锁。
这种方式可能会导致死锁,为了避免这种情况,需要设置超时时间。
下面,请看具体的实现步骤。
1.创建一个 Maven 工程并在 pom.xml 加入以下依赖:
org.springframework.boot spring-boot-starter-parent 2.0.2.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-data-redis
2.创建启动类 Application.java:
@SpringBootApplicationpublic class Application { public static void main(String[] args) { SpringApplication.run(Application.class,args); } }
3.添加配置文件 application.yml:
server: port: 8080spring: redis: host: localhost port: 6379
4.创建全局锁类 Lock.java:
public class Lock { private String name; private String value; public Lock(String name, String value) { this.name = name; this.value = value; } public String getName() { return name; } public String getValue() { return value; } }
5.创建分布式锁类 DistributedLockHandler.java:
@Componentpublic class DistributedLockHandler { private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class); private final static long LOCK_EXPIRE = 30 * 1000L;//单个业务持有锁的时间30s,防止死锁 private final static long LOCK_TRY_INTERVAL = 30L;//默认30ms尝试一次 private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;//默认尝试20s @Autowired private StringRedisTemplate template; public boolean tryLock(Lock lock) { return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } public boolean tryLock(Lock lock, long timeout) { return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } public boolean tryLock(Lock lock, long timeout, long tryInterval) { return getLock(lock, timeout, tryInterval, LOCK_EXPIRE); } public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) { return getLock(lock, timeout, tryInterval, lockExpireTime); } public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) { try { if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) { return false; } long startTime = System.currentTimeMillis(); do{ if (!template.hasKey(lock.getName())) { ValueOperations ops = template.opsForValue(); ops.set(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS); return true; } else {//存在锁 logger.debug("lock is exist!!!"); } if (System.currentTimeMillis() - startTime > timeout) {//尝试超过了设定值之后直接跳出循环 return false; } Thread.sleep(tryInterval); } while (template.hasKey(lock.getName())) ; } catch (InterruptedException e) { logger.error(e.getMessage()); return false; } return false; } public void releaseLock(Lock lock) { if (!StringUtils.isEmpty(lock.getName())) { template.delete(lock.getName()); } } }
6.最后创建 HelloController 来测试分布式锁。
@RestControllerpublic class HelloController { @Autowired private DistributedLockHandler distributedLockHandler; @RequestMapping("index") public String index(){ Lock lock=new Lock("lynn","min"); if(distributedLockHandler.tryLock(lock)){ try { //为了演示锁的效果,这里睡眠5000毫秒 System.out.println("执行方法"); Thread.sleep(5000); }catch (Exception e){ e.printStackTrace(); } distributedLockHandler.releaseLock(lock); } return "hello world!"; }}
7.测试。
启动 Application.java,连续访问两次浏览器:http://localhost:8080/index,控制台可以发现先打印了一次“执行方法”,说明后面一个线程被锁住了,5秒后又再次打印了“执行方法”,说明锁被成功释放。
通过这种方式创建的分布式锁存在以下问题:
1、高并发的情况下,如果两个线程同时进入循环,可能导致加锁失败。
2、SETNX 是一个耗时操作,因为它需要判断 Key 是否存在,因为会存在性能问题。
因此,Redis 官方推荐 Redlock 来实现分布式锁。
3.1.2、使用Redlock
通过 Redlock 实现分布式锁比其他算法更加可靠,继续改造上一例的代码。
1.pom.xml 增加以下依赖:
org.redisson redisson 3.7.0
2.增加以下几个类:
public interface AquiredLockWorker { T invokeAfterLockAquire() throws Exception;}public interface DistributedLocker { T lock(String resourceName, AquiredLockWorker worker) throws UnableToAquireLockException, Exception; T lock(String resourceName, AquiredLockWorker worker, int lockTime) throws UnableToAquireLockException, Exception; }public class UnableToAquireLockException extends RuntimeException { public UnableToAquireLockException() { } public UnableToAquireLockException(String message) { super(message); } public UnableToAquireLockException(String message, Throwable cause) { super(message, cause); }}@Componentpublic class RedissonConnector { RedissonClient redisson; @PostConstruct public void init(){ redisson = Redisson.create(); } public RedissonClient getClient(){ return redisson; } }@Componentpublic class RedisLocker implements DistributedLocker{ private final static String LOCKER_PREFIX = "lock:"; @Autowired RedissonConnector redissonConnector; @Override public T lock(String resourceName, AquiredLockWorker worker) throws InterruptedException, UnableToAquireLockException, Exception { return lock(resourceName, worker, 100); } @Override public T lock(String resourceName, AquiredLockWorker worker, int lockTime) throws UnableToAquireLockException, Exception { RedissonClient redisson= redissonConnector.getClient(); RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName); // Wait for 100 seconds seconds and automatically unlock it after lockTime seconds boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS); if (success) { try { return worker.invokeAfterLockAquire(); } finally { lock.unlock(); } } throw new UnableToAquireLockException(); }}3.修改 HelloController:@RestControllerpublic class HelloController { @Autowired private DistributedLocker distributedLocker; @RequestMapping("index") public String index()throws Exception{ distributedLocker.lock("test",new AquiredLockWorker