侧边栏壁纸
  • 累计撰写 101 篇文章
  • 累计创建 89 个标签
  • 累计收到 9 条评论

分布式锁 - 几种解决方案

bearjun
2021-02-21 / 0 评论 / 0 点赞 / 1,540 阅读 / 7,018 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2021-02-21,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

几种分布式锁的比较

方式优点缺点
数据库实现简单、易于理解对数据库压力大
Redis易于理解自己实现、不支持阻塞
Zookeeper支持阻塞需理解Zookeeper、程序复杂
Curator提供锁的方法依赖Zookeeper,强一致
Redission提供锁的方式,可阻塞/

几种分布式锁

数据库

select ...... for update;

Select … for update语句是我们经常使用手工加锁语句。通常情况下,select语句是不会对数据加锁,妨碍影响其他的DML和DDL操作。同时,在多版本一致读机制的支持下,select语句也不会被其他类型语句所阻碍。

-- 查询当前窗口是否自动提交
select @@autocommit;
-- 设置不为自动提交
set @@autocommit = 0;

SELECT * FROM `sys_dict` where code = 2 for update;

18128-ivrlewaukrn.png
64173-5h22lyz8n0q.png
由上图可以看出,当已经有select..for update对同一条数据进行操作时,另外一个只能等待。
当窗口一执行commit时,窗口二才能运行。

Redis

redis的分布式锁是基于redis的setnx命令来实现的

redis官网的分布式锁地址:https://redis.io/topics/distlock

SET resource_name my_random_value NX PX 30000
# resource_name:资源名称,区分业务
# my_random_value:随机值,每个线程的随机值都不同,用于释放锁时校验,推荐uuid
# NX:key存在的时候设置成功,key不存在的时候设置不成功
# PX:自动失效的时间,出现异常情况,锁可以自动释放掉

加锁:redis利用nx的原子性,多个线程并发的时候,只有一个线程可以设置成功,设置成功及获得锁,可以继续执行后续的的业务处理,如果产生异常,过了锁的有效期,自动释放锁。
释放锁:通过redis的delete命令,释放锁要校验随机值,防止别的线程的锁,所以需要lua脚本

if redis.call("get",KEYS[1]) == ARGV[1] then
   retuen redis.call("del", KEYS[1])
else
   retun 0
end

下面我们来一起看看分布式锁的代码:

public class RedisLock implements AutoCloseable{

    private RedisTemplate redisTemplate;
    private String key;
    private String value;
    private int expireTime;

    public RedisLock(RedisTemplate redisTemplate, String key, int expireTime){
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.value = UUID.randomUUID().toString();
        this.expireTime = expireTime;
    }

    @Override
    public void close() throws Exception {
        unLock();
    }

    /**
     * 获取锁
     * @return
     */
    public Boolean lock(){
        RedisCallback<Boolean> redisCallback = redisConnection -> {
            //设置nx
            RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
            //设置过期时间
            Expiration expiration = Expiration.seconds(expireTime);
            //序列化key
            byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
            //序列化value
            byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
            //执行setnx操作
            Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption);
            return result;
        };
        Boolean isLock = (Boolean) redisTemplate.execute(redisCallback);
        return isLock;
    }


    /**
     * 释放锁
     * @return
     */
    public Boolean unLock(){
        //lua脚本
        String luaScript = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                "   retuen redis.call(\"del\", KEYS[1])\n" +
                "else\n" +
                "   retun 0\n" +
                "end";
        //转换lua脚本为redis脚本
        RedisScript redisScript = RedisScript.of(luaScript, Boolean.class);
        //执行删除的方法
        Boolean ifUnLock = (Boolean) redisTemplate.execute(redisScript, Arrays.asList(key) , value);
        return ifUnLock;
    }
}

测试类

@RestController
public class TestController {

    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/testLock")
    public String testLock(){
        try (RedisLock redisLock = new RedisLock(redisTemplate, "ceshi", 30)){
            if (redisLock.lock()){
                //TODO 业务代码
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success";
    }
}

Zookeeper

zk的学习文档:https://www.runoob.com/w3cnote/zookeeper-tutorial.html

实现原理:利用zk的瞬时有序节点的特性,多线程在并发创建瞬时节点时,得到有序的序列,我们认为,序号最小的线程获取锁。其他的节点监听自己序号的前一个序号。前一个线程执行完成,删除自己的序号节点,由下一个序号的线程得到通知,继续执行。
以此内推,每一个线程都去监听它前一个节点信息,所有在创建节点的时候,就已经确定了线程的执行顺序。
33596-yy7pysbczx.png

  • 下载zk,安装,记住maven的版本要和zk版本最好保持一致
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.5.9</version>
</dependency>
  • 编写zk的主要代码
public class ZkLock implements AutoCloseable, Watcher {

    private ZooKeeper zooKeeper;
    private String znode;

    public ZkLock() throws IOException {
        this.zooKeeper = new ZooKeeper("localhost",60000,this);
    }

    public Boolean lock(String businessCode){
        Stat exists = null;
        String nodeString = "/" + businessCode;
        try {
            exists = zooKeeper.exists(nodeString, false);
            //创建持久性的根节点(业务节点)
            if (exists == null){
                zooKeeper.create(nodeString, businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            //创建瞬时有序的数据节点 (/user/user_00000001)
            znode = zooKeeper.create(nodeString + "/" + businessCode + "_", businessCode.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //获取业务节点下所有的子节点
            List<String> childrenNodes = zooKeeper.getChildren(nodeString, false);
            //排序
            Collections.sort(childrenNodes);
            //获取序号最小的第一个子节点
            String firstChildrenNode = childrenNodes.get(0);
            //如果创建的节点是第一个子节点,则获取锁
            if (znode.endsWith(firstChildrenNode)){
                return true;
            }
            //如果不是第一个子节点,则监听前一个节点
            String lastNode = firstChildrenNode;
            //循环node节点
            for (String thisNode : childrenNodes) {
                //如果当前节点为创建的节点,则监听上一个节点,然后进入process方法进行监听
                if (znode.endsWith(thisNode)) {
                    zooKeeper.exists(nodeString + "/" + lastNode, true);
                    break;
                } else {
                    lastNode = thisNode;
                }
            }
            //等待监听的节点被删除唤醒
            synchronized (this){
                wait();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void close() throws Exception {
        zooKeeper.delete(znode, -1);
        zooKeeper.close();
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        //如果被监听的节点被删除了
        if (watchedEvent.getType() == Event.EventType.NodeDeleted){
            synchronized (this) {
                //唤醒
                notify();
            }
        }
    }
}
  • 测试
@RestController
public class ZkController {

    @RequestMapping("testZkLock")
    public void testZkLock(){
        try (ZkLock zkLock = new ZkLock()) {
            if (zkLock.lock("testZkLock")){
                //TODO 业务代码
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Curator

Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。 它包含一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。 它还包括针对常见用例和扩展的配方,例如服务发现和Java 8异步DSL。

官网地址:http://curator.apache.org/

<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>2.4.0</version>
</dependency>

官网地址详细的使用方法:http://curator.apache.org/getting-started.html

//修改一下,基于CuratorFramework注入
@Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework getCuratorFramework(){
   RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
   return CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
}

//测试controller
@RestController
public class CuratorController {

    @Autowired
    private CuratorFramework curatorFramework;

   // 官方地址提供的demo
    @RequestMapping("/testCurator")
    public void testCurator(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
        client.start();
        InterProcessMutex lock = new InterProcessMutex(client, "/testCurator");
        try {
            if (lock.acquire(20, TimeUnit.SECONDS) ) {
                //TODO
                System.out.println("ssssss");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        client.close();
    }

    // 修改后的测试demo
    @RequestMapping("/testCurator2")
    public void testCurator2(){
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/testCurator");
        try {
            if (lock.acquire(20, TimeUnit.SECONDS) ) {
                //TODO
                System.out.println("ssssss");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Redission

官网地址:https://github.com/redisson/redisson#quick-start
通过官方文档,可以很简单的配置,这里简单粘贴一下代码,文档都有

@RestController
public class RedissionController {

    @Autowired
    private RedissonClient redisson;

    @RequestMapping("/testRedission")
    public void testRedission(){

        RLock lock = redisson.getLock("myLock");
        try {
            lock.lock(30, TimeUnit.SECONDS);
            // TODO 业务代码
        } finally {
            lock.unlock();
        }
    }
}

至此,几种分布式锁都简单的过了一遍,其实项目中使用的最多的还是最后一个。

0

评论区