几种分布式锁的比较
方式 | 优点 | 缺点 |
---|---|---|
数据库 | 实现简单、易于理解 | 对数据库压力大 |
Redis | 易于理解 | 自己实现、不支持阻塞 |
Zookeeper | 支持阻塞 | 需理解Zookeeper、程序复杂 |
Curator | 提供锁的方法 | 依赖Zookeeper,强一致 |
Redission | 提供锁的方式,可阻塞 | / |
几种分布式锁
数据库
select ...... for update;
Select … for update语句是我们经常使用手工加锁语句。通常情况下,select语句是不会对数据加锁,妨碍影响其他的DML和DDL操作。同时,在多版本一致读机制的支持下,select语句也不会被其他类型语句所阻碍。
-- 查询当前窗口是否自动提交
select @@autocommit;
-- 设置不为自动提交
set @@autocommit = 0;
SELECT * FROM `sys_dict` where code = 2 for update;
由上图可以看出,当已经有select..for update对同一条数据进行操作时,另外一个只能等待。
当窗口一执行commit时,窗口二才能运行。
Redis
redis的分布式锁是基于redis的setnx命令来实现的
redis官网的分布式锁地址:https://redis.io/topics/distlock
SET resource_name my_random_value NX PX 30000
# resource_name:资源名称,区分业务
# my_random_value:随机值,每个线程的随机值都不同,用于释放锁时校验,推荐uuid
# NX:key存在的时候设置成功,key不存在的时候设置不成功
# PX:自动失效的时间,出现异常情况,锁可以自动释放掉
加锁:redis利用nx的原子性,多个线程并发的时候,只有一个线程可以设置成功,设置成功及获得锁,可以继续执行后续的的业务处理,如果产生异常,过了锁的有效期,自动释放锁。
释放锁:通过redis的delete命令,释放锁要校验随机值,防止别的线程的锁,所以需要lua脚本
if redis.call("get",KEYS[1]) == ARGV[1] then
retuen redis.call("del", KEYS[1])
else
retun 0
end
下面我们来一起看看分布式锁的代码:
public class RedisLock implements AutoCloseable{
private RedisTemplate redisTemplate;
private String key;
private String value;
private int expireTime;
public RedisLock(RedisTemplate redisTemplate, String key, int expireTime){
this.redisTemplate = redisTemplate;
this.key = key;
this.value = UUID.randomUUID().toString();
this.expireTime = expireTime;
}
@Override
public void close() throws Exception {
unLock();
}
/**
* 获取锁
* @return
*/
public Boolean lock(){
RedisCallback<Boolean> redisCallback = redisConnection -> {
//设置nx
RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
//设置过期时间
Expiration expiration = Expiration.seconds(expireTime);
//序列化key
byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
//序列化value
byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
//执行setnx操作
Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption);
return result;
};
Boolean isLock = (Boolean) redisTemplate.execute(redisCallback);
return isLock;
}
/**
* 释放锁
* @return
*/
public Boolean unLock(){
//lua脚本
String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" retuen redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" retun 0\n" +
"end";
//转换lua脚本为redis脚本
RedisScript redisScript = RedisScript.of(luaScript, Boolean.class);
//执行删除的方法
Boolean ifUnLock = (Boolean) redisTemplate.execute(redisScript, Arrays.asList(key) , value);
return ifUnLock;
}
}
测试类
@RestController
public class TestController {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("/testLock")
public String testLock(){
try (RedisLock redisLock = new RedisLock(redisTemplate, "ceshi", 30)){
if (redisLock.lock()){
//TODO 业务代码
}
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}
}
Zookeeper
zk的学习文档:https://www.runoob.com/w3cnote/zookeeper-tutorial.html
实现原理:利用zk的瞬时有序节点的特性,多线程在并发创建瞬时节点时,得到有序的序列,我们认为,序号最小的线程获取锁。其他的节点监听自己序号的前一个序号。前一个线程执行完成,删除自己的序号节点,由下一个序号的线程得到通知,继续执行。
以此内推,每一个线程都去监听它前一个节点信息,所有在创建节点的时候,就已经确定了线程的执行顺序。
- 下载zk,安装,记住maven的版本要和zk版本最好保持一致
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
- 编写zk的主要代码
public class ZkLock implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String znode;
public ZkLock() throws IOException {
this.zooKeeper = new ZooKeeper("localhost",60000,this);
}
public Boolean lock(String businessCode){
Stat exists = null;
String nodeString = "/" + businessCode;
try {
exists = zooKeeper.exists(nodeString, false);
//创建持久性的根节点(业务节点)
if (exists == null){
zooKeeper.create(nodeString, businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//创建瞬时有序的数据节点 (/user/user_00000001)
znode = zooKeeper.create(nodeString + "/" + businessCode + "_", businessCode.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//获取业务节点下所有的子节点
List<String> childrenNodes = zooKeeper.getChildren(nodeString, false);
//排序
Collections.sort(childrenNodes);
//获取序号最小的第一个子节点
String firstChildrenNode = childrenNodes.get(0);
//如果创建的节点是第一个子节点,则获取锁
if (znode.endsWith(firstChildrenNode)){
return true;
}
//如果不是第一个子节点,则监听前一个节点
String lastNode = firstChildrenNode;
//循环node节点
for (String thisNode : childrenNodes) {
//如果当前节点为创建的节点,则监听上一个节点,然后进入process方法进行监听
if (znode.endsWith(thisNode)) {
zooKeeper.exists(nodeString + "/" + lastNode, true);
break;
} else {
lastNode = thisNode;
}
}
//等待监听的节点被删除唤醒
synchronized (this){
wait();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public void close() throws Exception {
zooKeeper.delete(znode, -1);
zooKeeper.close();
}
@Override
public void process(WatchedEvent watchedEvent) {
//如果被监听的节点被删除了
if (watchedEvent.getType() == Event.EventType.NodeDeleted){
synchronized (this) {
//唤醒
notify();
}
}
}
}
- 测试
@RestController
public class ZkController {
@RequestMapping("testZkLock")
public void testZkLock(){
try (ZkLock zkLock = new ZkLock()) {
if (zkLock.lock("testZkLock")){
//TODO 业务代码
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Curator
Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。 它包含一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。 它还包括针对常见用例和扩展的配方,例如服务发现和Java 8异步DSL。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.0</version>
</dependency>
官网地址详细的使用方法:http://curator.apache.org/getting-started.html
//修改一下,基于CuratorFramework注入
@Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework getCuratorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
return CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
}
//测试controller
@RestController
public class CuratorController {
@Autowired
private CuratorFramework curatorFramework;
// 官方地址提供的demo
@RequestMapping("/testCurator")
public void testCurator(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
InterProcessMutex lock = new InterProcessMutex(client, "/testCurator");
try {
if (lock.acquire(20, TimeUnit.SECONDS) ) {
//TODO
System.out.println("ssssss");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
client.close();
}
// 修改后的测试demo
@RequestMapping("/testCurator2")
public void testCurator2(){
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/testCurator");
try {
if (lock.acquire(20, TimeUnit.SECONDS) ) {
//TODO
System.out.println("ssssss");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Redission
官网地址:https://github.com/redisson/redisson#quick-start
通过官方文档,可以很简单的配置,这里简单粘贴一下代码,文档都有
@RestController
public class RedissionController {
@Autowired
private RedissonClient redisson;
@RequestMapping("/testRedission")
public void testRedission(){
RLock lock = redisson.getLock("myLock");
try {
lock.lock(30, TimeUnit.SECONDS);
// TODO 业务代码
} finally {
lock.unlock();
}
}
}
至此,几种分布式锁都简单的过了一遍,其实项目中使用的最多的还是最后一个。
评论区