当前位置: 新豪天地登录网址 > www.3559.com > 正文

Redisson使用及源码分析,Redis分布式锁

时间:2019-10-04 22:40来源:www.3559.com
原标题:基于Redis实现分布式锁-Redisson使用及源码分析【面试 工作】 在多线程开发中我们使用锁来避免线程争夺共享资源。在分布式系统中,程序在多个节点上运行无法使用单机锁来避

原标题:基于Redis实现分布式锁-Redisson使用及源码分析【面试 工作】

在多线程开发中我们使用锁来避免线程争夺共享资源。在分布式系统中,程序在多个节点上运行无法使用单机锁来避免资源竞争,因此我们需要一个锁服务来避免多个节点上的进程争夺资源。

常见3种分布式的实现比较

业务背景:存储请求参数token ,token唯一 ,且新的生成旧的失效

基于Redis实现分布式锁-Redisson使用及源码分析【面试 工作】

Redis数据库基于内存,具有高吞吐量、便于执行原子性操作等特点非常适合开发对一致性要求不高的锁服务。

  • 基于数据库实现分布式锁
  • 基于缓存实现分布式锁(redis,mc)
  • 基于Zookeeper实现分布式锁

思路:因为是多台机器,获取token存入redis,保持唯一,考虑使用redis来加锁,其实就是在redis中存一个key,其他机器发现key有值的话就不进行获取token的请求。

在分布式场景下,有很多种情况都需要实现最终一致性。在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等)。通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性。

本文介绍了简单分布式锁、Redisson分布式锁的实现以及解决单点服务的RedLock分布式锁概念。

Redission

SET操作会覆盖原有值,SETEX虽然可设置key过期时间,但也会覆盖原有值,所以考虑可以使用SETNX

本文主要探讨另外一种实现分布式最终一致性的解决方案——采用分布式锁。基于分布式锁的解决方案,比如zookeeper,redis都是相较于持久化(如利用InnoDB行锁,或事务,或version乐观锁)方案提供了高可用性,并且支持丰富化的使用场景。 本文通过Java版本的redis分布式锁开源框架——Redisson来解析一下实现分布式锁的思路。

Redis是一致性较低的数据库,若对锁服务的一致性要求较高建议使用zookeeper等中间件开发锁服务。

1. 简介

redission为redis官方推荐方式翻译,github地址。redisson-quick-start

SETNX Key value

分布式锁的使用场景

基于单点Redis的分布式锁

Redis实现分布式锁的原理非常简单, 节点在访问共享资源前先查询redis中是否有该资源对应的锁记录, 若不存在锁记录则写入一条锁记录(即获取锁)随后访问共享资源. 若节点查询到redis中已经存在了资源对应的锁记录, 则放弃操作共享资源.

下面给出一个非常简单的分布式锁示例:

import redis.clients.jedis.Jedis;

import java.util.Random;
import java.util.UUID;


public class MyRedisLock {

    private Jedis jedis;

    private String lockKey;

    private String value;

    private static final Integer DEFAULT_TIMEOUT = 30;

    private static final String SUFFIX = ":lock";

    public MyRedisLock(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean acquire(String key, long time) throws InterruptedException {
        Long outdatedTime = System.currentTimeMillis()   time;
        lockKey = key   SUFFIX;
        while (true) {
            if (System.currentTimeMillis() >= outdatedTime) {
                return false;
            }
            value = UUID.randomUUID().toString(); // 1
            return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2
        }
    }

    public boolean check() {
        return value != null && value.equals(jedis.get(lockKey)); // 3
    }

    public boolean release() {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3
    }
}

加锁后所有对共享资源的操作都应该先检查当前线程是否仍持有锁。

在分布式锁的实现中有几点需要注意:

  1. 加锁过程:
    1. 锁的过期时间应设置到redis中,保证在加锁客户端故障的情况下锁可以被自动释放
    2. 使用set key value EX seconds NX命令进行加锁,不要使用setnx和expire两个命令加锁。
      若setnx执行成功而expire失败(如执行setnx后客户端崩溃),则可能造成死锁。
    3. 锁记录的值不能使用固定值。 使用固定值可能导致严重错误: 线程A的锁因为超时被释放, 随后线程B成功加锁。 B写入的锁记录与A的锁记录没有区别, 因此A在检查时会误判为自己仍持有锁。
  2. 解锁过程:
    1. 解锁操作使用lua脚本执行get和del两个操作,为了保证两个操作的原子性。若两个操作不具有原子性则可能出现错误时序: 线程A执行get操作判断自己仍持有锁 -> 锁超时释放 -> 线程B成功加锁 -> 线程A删除锁记录(线程A认为删除了自己的锁记录,实际上删除了线程B的锁记录)。

上文只是提供了简单示例,还有一些重要功能没有实现:

  1. 阻塞加锁:可以使用redis的发布订阅功能,获取锁失败的线程订阅锁被释放的消息再次尝试加锁
  2. 无限期锁:应写入有TTL的锁记录,设置定时任务在锁失效前刷新锁过期的时间。这种方式可以避免持有锁的线程崩溃导致的死锁
  3. 可重入锁(持有锁的线程可以再次加锁):示例中持有锁的线程无法对同一个资源再次加锁,即不可重入锁。实现可重入锁需要锁记录由(key:资源标记, value:持有者标记)的键值对结构变为(key:资源标记, field:持有者标记, value:计数器)这样的hash结构。持有锁的线程每次重入锁计数器加1,每次释放锁计数器减1,计数器为0时删除锁记录。

总结来看实现Redis分布式锁有几点需要注意:

  1. 加解锁操作应保证原子性,避免多个线程同时操作出现异常
  2. 应考虑进程崩溃、Redis崩溃、操作成功执行但未收到成功响应等异常状况,避免死锁
  3. 解锁操作必须避免 某个线程释放了不属于自己的锁 的异常

2. 实现分析

将 key 的值设为 value ,当且仅当 key 不存在。

如果是不跨限界上下文的情况,跟本地领域服务相关的数据一致性,尽量还是用事务来保证。但也有些无法用事务或者乐观锁来处理的情况,这些情况大多是对于一个共享型的数据源,有并发写操作的场景,但又不是对于单一领域的操作。

Redisson

这里我们以基于Java的Redisson为例讨论一下成熟的Redis分布式锁的实现。

redisson实现了java.util.concurrent.locks.Lock接口,可以像使用普通锁一样使用redisson:

RLock lock = redisson.getLock("key"); 
lock.lock(); 
try {
    // do sth.
} finally {
    lock.unlock(); 
}

分析一下RLock的实现类org.redisson.RedissonLock:

2.1 解决的问题

 

举个例子,还是用租书来比喻,A和B两个人都来租书,在查看图书的时候,发现自己想要看的书《大设计》库存仅剩一本。书店系统中,书作为一种商品,是在商品系统中,以Item表示出租商品的领域模型,同时每一笔交易都会产生一个订单,Order是在订单系统(交易限界上下文)中的领域模型。这里假设先不考虑跨系统通信的问题,也暂时不考虑支付环节,但是我们需要保证A,B两个人不会都对于《大设计》产生订单就可以,也就是其中一个人是可以成功下单,另外一个人只要提示库存已没即可。此时,书的库存就是一种共享的分布式资源,下订单,减库存就是一个需要保证一致性的写操作。但又因为两个操作不能在同一个本地事务,或者说,不共享持久化的数据源的情况,这时候就可以考虑用分布式锁来实现。本例子中,就需要对于共享资源——书的库存进行加锁,至于锁的key可以结合领域模型的唯一标识,如itemId,以及操作类型(如操作类型是RENT的)设计一个待加锁的资源标识。当然,这里还有一个并发性能的问题,如果是个库存很多的秒杀类型的业务,那么就不能单纯在itemId 加类型加锁,还需要设计排队队列以及合理的调度算法,防止超卖等等,那些就是题外话了。本文只是将这个场景作为一个切入点,具体怎么设计锁,什么场景用还要结合业务。

加锁操作

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

再看等待加锁的方法lockInterruptibly:

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

lockInterruptibly 方法会尝试获取锁,若获取失败则会订阅释放锁的消息。收到锁被释放的通知后再次尝试获取锁,直到成功或者超时。

接下来分析tryAcquire:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId)); // 调用异步获得锁的实现,使用get(future)实现同步
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 设置了超时时间
    if (leaseTime != -1) {
        // tryLockInnerAsync 加锁成功返回 null, 加锁失败在 Future 中返回锁记录剩余的有效时间
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 未设置超时时间,尝试获得无限期的锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 避免对共享资源操作完成前锁就被释放掉,定期刷新锁失效的时间
                // 默认锁失效时间的三分之一即进行刷新
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync中主要逻辑是无限期锁的实现,Redisson并非设置了永久的锁记录,而是定期刷新锁失效的时间。

这种方式避免了持有锁的进程崩溃无法释放锁导致死锁。

真正实现获取锁逻辑的是tryLockInnerAsync方法:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(
        getName(),
        LongCodec.INSTANCE, 
        command,
          "if (redis.call('exists', KEYS[1]) == 0) then "   // 资源未被加锁
              "redis.call('hset', KEYS[1], ARGV[2], 1); "   // 写入锁记录, 锁记录是一个hash; key:共享资源名称, field:锁实例名称(Redisson客户端ID:线程ID), value: 1(value是一个计数器,记录当前线程获取该锁的次数,实现可重入锁)
              "redis.call('pexpire', KEYS[1], ARGV[1]); "   // 设置锁记录过期时间
              "return nil; "  
          "end; "  
          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "   // 若当前线程已经持有该资源的锁
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); "   // 将锁计数器加1, 
              "redis.call('pexpire', KEYS[1], ARGV[1]); "  
              "return nil; "  
          "end; "  
          "return redis.call('pttl', KEYS[1]);", // 资源已被其它线程加锁,加锁失败。获取锁剩余生存时间后返回
        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

上述操作使用eval命令执行lua脚本保证了操作的原子性。

安全和活跃性保证

  • 安全

任何时候只有一个客户端可以获得锁

  • 活跃属性
  1. 死锁自由(即使一个客户端已经拥用了已损坏或已被分割资源的锁,但它也有可能请求其他的锁)

举例:竞态条件

客户端A在主节点获得了一个锁。
主节点挂了,而到从节点的写同步还没完成。
从节点被提升为主节点。
客户端B获得和A相同的锁。注意,锁安全性被破坏了!
  1. 容错(只要大部分Redis节点可用, 客户端就可以获得和释放锁)

  2. 实现思路


若给定的 key 已经存在,则 SETNX 不做任何动作

领域服务概念

unlock

解锁过程相对简单:

@Override
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                  id   " thread-id: "   Thread.currentThread().getId());
    }
    if (opStatus) {
        cancelExpirationRenewal();
    }
}

unlockInnerAsync方法实现了具体的解锁逻辑:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then "   // 资源未被加锁,可能锁已被超时释放
                "redis.call('publish', KEYS[2], ARGV[1]); "   // 发布锁被释放的消息
                "return 1; "  
            "end;"  
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "   // 锁的持有者不是自己,抛出异常
                "return nil;"  
            "end; "  
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "   // 自己持有锁,因为锁是可重入的将计数器减1
            "if (counter > 0) then "   // 计数器大于0,锁未被完全释放,刷新锁过期时间
                "redis.call('pexpire', KEYS[1], ARGV[2]); "   
                "return 0; "  
            "else "  
                "redis.call('del', KEYS[1]); "   // 锁被完全释放,删除锁记录,发布锁被释放的消息
                "redis.call('publish', KEYS[2], ARGV[1]); "  
                "return 1; " 
            "end; "  
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

3.1 单例演变

SET resource_name my_random_value NX PX 30000

解释:
设置key的值,仅当其不存在时生效(NX选项), 且设置其生存期为30000毫秒(PX选项)。和key关联的value值是"my_random_value"。这个值在所有客户端和所有加锁请求中是必须是唯一的

  • 随机唯一
    1. 可以确保当前锁是该客户端,而防止其他客户端释放、删掉。
    2. 一般组合unix时间戳和客户端ID 随机数(/dev/urandom初始化RC4算法)
  • 锁拥有时长
    1. 一种是指锁的自动释放时长
    2. 另一种是指在另一个客户端获取锁之前某个客户端占用这个锁的时长,这被限制在从锁获取后开始的一段时间窗口内。
  • 简单实现
设置锁的超时时间
SET resource_name my_random_value NX PX 30000

获取锁 == myKey,释放
否则:return 0
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

 

借用《Implementing Domain-driven Design》里面的对于领域服务的定义。领域的某个操作过程或转换过程不是实体或值对象的职责时,应该将操作放在一个单独的接口中,即领域服务,并且要和通用语言保持一致。这里的非实体或值对象操作会有很多种情况,比如某个操作需要对多个领域对象操作,输出一个值对象。在分层的架构中,有点类似于Manager。但是如果过渡抽象manager就会出现贫血,所以还需要确保领域服务是无状态的,并且做好和贫血模型的权衡。可能大多数情况,领域服务的参数都是比实际的领域模型小的,只有些关键属性的值对象。如果服务只操作领域的实体或值对象,则可以考虑下放到domain model中操作。

RedLock

基于单点的分布式锁无法解决redis故障的问题. 为了保证redis的可用性我们通常采用主从备份的方法, 即 使用一个master实例和至少一个slave实例.

当有写入请求时先写入master然后写入到所有slave, 当master实例故障时选择一个slave实例升级为master实例继续提供服务.

其中存在的问题是, 写入master和写入slave存在时间差. 若线程A成功将锁记录写入了master, 随后在同步写入slave之前, master故障转移到slave.

因为slave(新master)中没有锁记录, 因此线程B也可以成功加锁, 因此可能出现A和B同时持有锁的错误.

为了解决redis失效可能造成的问题, redis的作者antirez提出了RedLock实现方案:

  1. 客户端获取当前时间

  2. 客户端尝试获取N个节点的锁, 每个节点使用相同的key和value. 请求超时时间要远小于锁超时时间, 避免在节点或者网络故障时浪费时间.

  3. 客户端计算在加锁时消耗的时间, 只有客户端成功获得超过一半节点的锁且总时间小于锁超时间时才能成功加锁. 客户端持有锁的时间为锁超时时间减去加锁消耗的时间.

  4. 若获取锁失败则访问所有节点, 发起释放锁的请求.

释放锁时需要向所有Redis节点发出释放锁的请求, 原因在于可能某个Redis实例中成功写入了锁记录, 但是没有响应没有到达客户端.

为了保证所有锁记录都被正确释放, 所以需要向所有Redis实例发送释放请求.

实现

成功返回1,失败返回0。

前面提到了Manager,但是很多应用中都会把Manager抽象成接口的形式,但大多数情况其实完全没有必要,可以通过服务Factory的方式解耦,或者用Spring的@Service注解来注入真正的服务实现类。对于一些简单的领域操作,还可以抽象一个迷你层,这个迷你层也可以称作是领域服务,只不过是无状态,无事务,安全的一个抽象层。

关于安全性的讨论

关于RedLock的安全性问题, Martin Kleppmann和作者antirez进行了一些讨论:

  • Martin Kleppmann: How to do distributed locking
  • antirez:[Is Redlock safe?](http://antirez.com/news/101)

关于这场讨论的分析可以参考:

  • 基于Redis的分布式锁到底安全吗?

关键点

  • 锁的时效设置。避免单点故障造成死锁,影响其他客户端获取锁。但是也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。(网上很多解决方案都是其他客户端等待队列长度判断是否强制解锁,但其实在偶发情况下就不能保证一致性,也就失去了分布式锁的意义)。
  • 持锁期间的check,尽量在关键节点检查锁的状态,所以要设计成可重入锁,但在客户端使用时要做好吞吐量的权衡。
  • 减少获取锁的操作,尽量减少redis压力。所以需要让客户端的申请锁有一个等待时间,而不是所有申请锁的请求要循环申请锁。
  • 加锁的事务或者操作尽量粒度小,减少其他客户端申请锁的等待时间,提高处理效率和并发性。
  • 持锁的客户端解锁后,要能通知到其他等待锁的节点,否则其他节点只能一直等待一个预计的时间再触发申请锁。类似线程的notifyAll,要能同步锁状态给其他客户端,并且是分布式消息。
  • 考虑任何执行句柄中可能出现的异常,状态的正确流转和处理。比如,不能因为一个节点解锁失败,或者锁查询失败(redis 超时或者其他运行时异常),影响整个等待的任务队列,或者任务池。

看上去SETNX 配合 EXPIRE(过期时间)是个不错的选择,于是就有了加锁错误示例1:

jedis.setnx("lockName","value");
//这里redis挂掉,就是一个死锁
jedis.expire("lockName",10);

因为这两个操作不具备原子性,所以可能出现死锁,之所以有这样的示例,是因为低版本的redis的SET还不支持多参数命令

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。

这里可以引出 redis正确的加锁示例:

 public static boolean lock(Jedis jedis, String lockKey, String uid, int expireTime) {  

        String result = jedis.set(lockKey, uid,"NX" "PX", expireTime);  

        if ("OK".equals(result)) {  
            return true;  
        }  
        return false;  

    } 

 其实就等于在redis中执行了 :set key value nx px 10000

图片 1

第一个为key,我们使用key来当锁名

第二个为value,我们传的是uid,唯一随机数,也可以使用本机mac地址 uuid

第三个为NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作 第四个为PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定 第五个为time,代表key的过期时间,对应第四个参数 PX毫秒,EX秒

 

再来看一下分布式锁的要求:

 

分布式锁是用于解决分布式系统中操作共享资源时的数据一致性问题

 

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

 

领域事件其实也可以归纳为领域服务,不过领域服务的事件是幂等的。因为领域服务是无事务的,所以事件也是无副作用的,这样在处理聚合依赖的时候,需要保证他们的最终一致性。

锁设计

  • 加锁
// 检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1
if (redis.call('exists', key) == 0) 
then  
redis.call('hset', key, key-uunid, 1); //hset key field value
redis.call('pexpire', key, timeout);  //设置超时时间为毫秒
return null; 
end; 
// 如果锁重入,需要判断锁的key field 的情况 
if (redis.call('hexists', key, key-unuid) == 1) 
then 
redis.call('hincrby', key, key-unuid, 1);//使字段值增加指定的整数
redis.call('pexpire', key, timeout);//锁重入重新设置超时时间
return null; 
end; 
// 返回剩余的过期时间
return redis.call('pttl', key);
  • 解锁
// 如果key已经不存在,说明已经被解锁,直接发布(publihs)redis消息
if (redis.call('exists', key) == 0) 
then
redis.call('publish', channelName, ARGV[1]);
    return 1;
end;
// key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。
if (redis.call('hexists', key, key-uuid) == 0)
then 
    return null;
end; 
// 将value减1
local counter = redis.call('hincrby', key, key-uuid, -1); 
// 如果counter>0说明锁在重入,不能删除key
if (counter > 0)  
then
    redis.call('pexpire', key, timeout);                            
    return 0; 
else 
// 删除key并且publish 解锁消息
redis.call('del', key);                           
 redis.call('publish', channelName, ARGV[1]); 
return 1; 
end; 
return null;

互斥性。在任意时刻,只有一个客户端能持有锁。

{领域事件

Redisson源码解析

不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

将领域中发生的活动建模成一系列的离散事件,每个事件都用领域对象来表示。简而言之,领域事件就是领域中发生的事件。还拿租书为例,一本书被借走了,那么需要产生一个借书订单,并且对于租书者来说,需要能查看自己租书的列表和书籍详情,同时这本书也需要被标记为不能再借出的状态(因为已经被借走了)。这里面bookRent就可以作为一个领域事件来发出。

相关资料

分布式锁的几种实现方式
Redis实现分布式锁全局锁—Redis客户端Redisson中分布式锁RLock实现
基于Redis实现分布式锁,Redisson使用及源码分析

性能。排队等待锁的节点如果不知道锁何时会被释放,则只能隔一段时间尝试获取一次锁,这样无法保证资源的高效利用,因此当锁释放时,要能够通知等待队列,使一个等待节点能够立刻获得锁。

事件的聚合

重入。同一个线程可以重复拿到同一个资源的锁。

NX保证互斥性

PX保证不会死锁

Value传入的唯一标识保证是自己的锁(可以通过随机uuid 线程名称 来保证唯一)

对于上述的事件模型,我们可以创建具有聚合特性的领域事件。这里我们可以把这个事件本身建模成一个聚合(BookRentEvent 对象),并且有自己的持久化方式。唯一标识可以由一组属性决定,在客户方(Client)调用领域服务的时候创建这个领域事件{new bookRentEvent())},并添加到资源库中,然后再通过消息的方式进行发布。发布成功后再回调更新时间状态。但这里需要注意,消息发布最好和事件资源库在相同的上下文,或共享数据源,这样就可以保证事件的成功提交,在不同上下文系统,就需要做全局事务来保证。而唯一标识在这里的作用就是为了防止消息重发或者重复处理。所以订阅方需要检查重复消息,并且忽略。如果是本地上下文的事件,最好提供equals和hashcode 实现。

PS:因为 SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果,不知道将来的 Redis 版本会不会废弃 SETNX 、 SETEX 和 PSETEX 这三个命令 ?

 

下面看一个释放锁的错误示例

public static void wrongUnLock1(Jedis jedis, String lockKey, String requestId) {  

    // 判断加锁与解锁是不是同一个线程  
    if (requestId.equals(jedis.get(lockKey))) {  
        // lockkey锁失效,下一步删除的就是别人的锁  
        jedis.del(lockKey);  
    }  

}   

根本问题还是保证操作的原子性,因为是两步操作,即便判断到是当前线程的锁,但是也有可能再删除之前刚好过期,这样删除的就是其他线程的锁。

如果业务要求精细,我们可以使用lua脚本来进行完美解锁

/**
     * redis可以保证lua中的键的原子操作 unlock:lock调用完之后需unlock,否则需等待lock自动过期
     *
     * @param lock
     *  uid 只有线程已经获取了该锁才能释放它(uid相同表示已获取)
     */
    public  void unlock( String lock) {

        Jedis jedis = new Jedis("localhost");

        final String uid= tokenMap.get();
        if (StringUtil.isBlank(token))
            return;
        try {
            final String script = "if redis.call("get",""   lock   "") == ""  
             uid   ""then  return redis.call("del",""   lock   "") else return 0 end ";
            jedis.eval(script);
        } catch (Exception e) {
            throw new RedisException("error");
        } finally {
            if (jedis != null)
                jedis.close();
        }
    }

 

关于lua:

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 提供了交互式编程模式。我们可以在命令行中输入程序并立即查看效果。

 

结合刚才的例子,在书籍管理上下文中,书被借走了,那么书籍唯一表示和书的状态(Rent被借出)就可以标识一个事件。这个事件中需要有借书人的信息(如id,nick等),那么在持久化这个事件后,可以post一个Eventbus的本地消息,由用户书籍领域服务监听,更新用户书籍列表等一系列操作。然后再Callback到事件源,更新事件状态,处理成功。如果需要处理事件都在本地上下文,处理起来并不麻烦。

lua脚本优点

  • 减少网络开销:本来多次网络请求的操作,可以用一个请求完成,原先多次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延

  • 原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入

  • 复用:客户端发送的脚本会永久存储在Redis中,意味着其他客户端可以复用这一脚本而不需要使用代码完成同样的逻辑

 上面这个脚本很简单

if redis.call("get",""   lock   "") // redisGET命令
 == ""  uid   // 判断是否是当前线程
 ""then  return redis.call("del",""   lock   "") // 如果是,执行redis DEL操作,删除锁
 else return 0 end  

 

同理我们可以使用lua给线程加锁

 

local lockkey = KEYS[1]
--唯一随机数
local uid = KEYS[2]
--失效时间,如果是当前线程,也是续期时间
local time = KEYS[3]

if redis.call('set',lockkey,uid,'nx','px',time)=='OK' then
return 'OK'
else
    if redis.call('get',lockkey) == uid then
       if redis.call('EXPIRE',lockkey,time/1000)==1 then
       return 'OOKK'
       end
    end
end

 

lua脚本也可以通过外部文件读取,方便修改

 

  public void luaUnLock() throws Exception{
        Jedis jedis = new Jedis("localhost") ;
        InputStream input = new FileInputStream("unLock.lua");
        byte[] by = new byte[input.available()];
        input.read(by);
        String script = new String(by);
        Object obj = jedis.eval(script, Arrays.asList("key","123"), Arrays.asList(""));
        System.out.println("执行结果 "   obj);
    }

 

PS:跟同事讨论的时候,想到可不可以利用redis的额事物来解锁,并没有实际使用,怕有坑。

发布领域事件

redis事物解锁

public boolean unLock(Jedis jedis, String lockName, String uid) throws Exception{
            jedis.watch(lockName);
            //这里的判断uid和下面的del虽然不是原子性,有了watch可以保证不会误删锁
            if (jedis.get(lockName).equals(uid)) {
                redis.clients.jedis.Transaction transaction = jedis.multi();
                transaction.del(lockName);
                List<Object> exec = transaction.exec();
                if (exec.get(0).equals("OK")) {
                    transaction.close();
                    return true;
                }
            }
            return false;
        }

 

 

领域事件的发布可以用Observer模式。在本地上下文,也要尽量减少对基础设施或者消息中间件暴露领域模型,所以,需要将本地模型(领域模型)封装成事件的聚合。比如我们不能直接发布一个BookRent聚合的事件,而是一个BookRentEvent,这个Event对象,还会持有一些事件特有的属性,比如可能根据需要,会有occurTime(发生时间),isConsumed(是否已经被处理)。事件发布时,所有订阅方都会同步收到通知。领域事件的主要组件就是publisher和subscriber了。

可重入锁

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。

 

在Java中用set命令实现可重入锁

 

//保存每个线程独有的token
    private static ThreadLocal<String> tokenMap = new ThreadLocal<>();

/**
     * 这个例子还不太完善。  
     * redis实现分布式可重入锁,并不保证在过期时间内完成锁定内的任务,需根据业务逻辑合理分配seconds
     *
     * @param lock
     *            锁的名称
     * @param mseconds
     *            锁定时间,单位 毫秒
     *  token 对于同一个lock,相同的token可以再次获取该锁,不相同的token线程需等待到unlock之后才能获取
     *
     */
    public  boolean lock(final String lock,  int mseconds ,Jedis jedis) {
        // token 对于同一个lock,相同的token可以再次获取该锁,不相同的token线程需等待到unlock之后才能获取
        String token = tokenMap.get();
        if (StringUtil.isBlank(token)) {
            token = UUID.randomUUID().toString().replaceAll("-","");
            tokenMap.set(token);
        }
        boolean flag = false;
        try {
            String ret = jedis.set(lock, token, "NX", "PX", mseconds);
            if (ret == null) {// 该lock的锁已经存在
                String origToken = jedis.get(lock);// 即使lock已经过期也可以
                if (token.equals(origToken) || origToken==null) {
                // token相同默认为同一线程,所以token应该尽量长且随机,保证不同线程的该值不相同
                    ret = jedis.set(lock, token, "NX", "PX", mseconds);//
                    if ("OK".equalsIgnoreCase(ret))
                        flag = true;
                    System.out.println("当前线程 "   token);
                }
            } else if ("OK".equalsIgnoreCase(ret))
                flag = true;
            System.out.println("当前线程 "   token);
        } catch (Exception e) {

        } finally {
            if (jedis != null)
                jedis.close();
        }
        return flag;
    }

 

继续正题,说到lua脚本 和 可重入锁,就不得不提 redission了

发送者

redission

redisson是redis官网推荐的java语言实现分布式锁的项目

redission中提供了多样化的锁,

发送者本身并不表达一种领域概念,而是作为一种服务的形态。无论用什么技术方式实现,用什么框架,处理事件发送的思路也都可能不尽相同。比如,在web应用中,可以在启动应用的时候处理订阅者向发送者的事件注册(避免注册和处理发送的线程同步问题)。比如可以将关注的事件registe到本地的一个ThreadLocal的publisher List中。应用启动完成后,开始处理领域事件的时候,就可以发送一个事件的聚合。这个事件的聚合是一个事件对象,而不是领域模型中的实体,因为我们要暴露需要暴露的事件给其他上下文,而不是暴露完整的领域对象。如果使用EventBus,我们可以在post的时候,封装一个事件作为参数。

可重入锁(Reentrant Lock)

订阅者

公平锁(Fair Lock)

事件的订阅者可以作为应用服务的一个独立的组件。因为应用服务是在领域逻辑的外层,如果是纯粹的事件驱动,那么订阅者作为一种应用服务,也可以定位成具有单一职责的,负责事件存储的应用服务组件。

联锁(MultiLock)

分布式领域事件

红锁(RedLock)

在处理分布式事件中,最重要也是最难处理的就是一致性。消息的延迟,处理的不幂等就会影响领域模型状态的准确性和事件的处理。但是我们在系统间交互的过程中,可以用一些技术方式来达到最终一致性。这其中可能就需要进行事件模型的持久化。处理方式可以

 读写锁(ReadWriteLock)

1. 领域模型和消息设施共享持久存储的数据源。这种需要事件作为一种本地事件模型存储在和本地领域模型的同一个数据库中。这样保证了本地事务的一致,性能较好,但是不能和其他上下文共享持久化存储。

信号量(Semaphore) 等等

下面分析一下可重入的源码

 /**
     * redission分布式锁-重试时间 秒为单位
     * @param lockName 锁名
     * @param waitTime  重试时间
     * @param leaseTime 锁过期时间
     * @return
     */
    public boolean tryLock(String lockName,long waitTime,long leaseTime){
        try{
            RLock rLock = redissonClient.getLock(lockName);
            return rLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
        }catch (Exception e){
            logger.error("redission lock error with waitTime",e);
        }
        return false;
    }

2. 全局XA事务(两阶段提交)来控制。模型和消息的持久化可以分开,但是全局事务性能差,成本高。

org.redisson.Redisson#getLock()

@Override
public RLock getLock(String name) {
  return new RedissonLock(commandExecutor, name, id);
}

  

  • commandExecutor: 与 Redis 节点通信并发送指令的真正实现。需要说明一下,Redisson 的 CommandExecutor 实现是通过 eval 命令来执行 Lua 脚本,所以要求 Redis 的版本必须为 2.6 或以上

  • name: 锁的全局名称,例如上面代码中的 "foobar",具体业务中通常可能使用共享资源的唯一标识作为该名称。

  • id: Redisson 客户端唯一标识。

 

3. 在领域模型的持久化存储中,单独一块存储区域(单独一张事件表)来存储领域事件。也就是做本地的EventStore。但是需要有一个发布事件的消息机制,消息事件是完全私有的。消息的发送可以交给消息中间件来处理。如果可以的话,还可以将时间存储作为Rest资源。事件就可以以一种存档日志的形式对外发布事件(消息队列,通过消息设施或者中间件发送RabitMQ,MetaQ等)。这样还保证了时间的可追溯性。

org.redisson.RedissonLock#lock()

在直接使用 lock() 方法获取锁时,最后实际执行的是 lockInterruptibly(-1, null)

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 1.尝试获取锁
    Long ttl = tryAcquire(leaseTime, unit);
    // 2.获得锁成功
    if (ttl == null) {
        return;
    }
    // 3.等待锁释放,并订阅锁
    long threadId = Thread.currentThread().getId();
    Future<RedissonLockEntry> future = subscribe(threadId);
    get(future);

    try {
        while (true) {
            // 4.重试获取锁
            ttl = tryAcquire(leaseTime, unit);
            // 5.成功获得锁
            if (ttl == null) {
                break;
            }
            // 6.等待锁释放
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 7.取消订阅
        unsubscribe(future, threadId);
    }
}

 

 

  1. 首先尝试获取锁,具体代码下面再看,返回结果是已存在的锁的剩余存活时间,为 null 则说明没有已存在的锁并成功获得锁。

  2. 如果获得锁则结束流程,回去执行业务逻辑。

  3. 如果没有获得锁,则需等待锁被释放,并通过 Redis 的 channel 订阅锁释放的消息

  4. 订阅锁的释放消息成功后,进入一个不断重试获取锁的循环,循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。

  5. 如果在重试中拿到了锁,则结束循环,跳过第 6 步。

  6. 如果锁当前是被占用的,那么等待释放锁的消息,具体实现使用了 JDK 并发的信号量工具 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

  7. 在成功获得锁后,就没必要继续订阅锁的释放消息了,因此要取消对 Redis 上相应 channel 的订阅。

 

我们使用事件来解耦,是为了考虑尽量避免RPC,简化系统依赖,减少外部服务不可用对系统模型带来的状态影响。所以领域事件强调的是高度自治,但是也需要斟酌,通过事件处理的情况必须是容许延时的,并且消息的接收方需要是一个幂等接收器(可以自幂等,或者对于重复消息的拒绝处理),因为消息是可能重复发送的。}

重点看一下 tryAcquire() 方法的实现

private Long tryAcquire(long leaseTime, TimeUnit unit) {
    return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}

private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 2.用默认的锁超时时间去获取锁
    Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
                TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // 成功获得锁
            if (ttlRemaining == null) {
                // 3.锁过期时间刷新任务调度
                scheduleExpirationRenewal();
            }
        }
    });
    return ttlRemainingFuture;
}

<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
                RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 3.使用 EVAL 命令执行 Lua 脚本获取锁
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then "  
                  "redis.call('hset', KEYS[1], ARGV[2], 1); "  
                  "redis.call('pexpire', KEYS[1], ARGV[1]); "  
                  "return nil; "  
              "end; "  
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "  
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); "  
                  "redis.call('pexpire', KEYS[1], ARGV[1]); "  
                  "return nil; "  
              "end; "  
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime,
                        getLockName(threadId));
}
  • 获取锁真正执行的命令,Redisson 使用 EVAL 命令执行上面的 Lua 脚本来完成获取锁的操作

  • 通过 exists 命令发现当前 key 不存在,即锁没被占用,则执行 hset 写入 Hash 类型数据 key:全局锁名称(例如共享资源ID), field:锁实例名称(Redisson客户端ID:线程ID), value:1,并执行 pexpire 对该 key 设置失效时间,返回空值 nil,至此获取锁成功

  • 如果通过 hexists 命令发现 Redis 中已经存在当前 key 和 field 的 Hash 数据,说明当前线程之前已经获取到锁,因为这里的锁是可重入的,则执行 hincrby 对当前 key field 的值加一,并重新设置失效时间,返回空值,至此重入获取锁成功。

  • 最后是锁已被占用的情况,即当前 key 已经存在,但是 Hash 中的 Field 与当前值不同,则执行 pttl 获取锁的剩余存活时间并返回,至此获取锁失败。

 redisson释放锁

public void unlock() {
    // 1.通过 EVAL 和 Lua 脚本执行 Redis 命令释放锁
    Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                    RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then "  
                        "redis.call('publish', KEYS[2], ARGV[1]); "  
                        "return 1; "  
                    "end;"  
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "  
                        "return nil;"  
                    "end; "  
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "  
                    "if (counter > 0) then "  
                        "redis.call('pexpire', KEYS[1], ARGV[2]); "  
                        "return 0; "  
                    "else "  
                        "redis.call('del', KEYS[1]); "  
                        "redis.call('publish', KEYS[2], ARGV[1]); "  
                        "return 1; " 
                    "end; "  
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), 
                            LockPubSub.unlockMessage, internalLockLeaseTime, 
                            getLockName(Thread.currentThread().getId()));
    // 2.非锁的持有者释放锁时抛出异常
    if (opStatus == null) {
        throw new IllegalMonitorStateException(
                "attempt to unlock lock, not locked by current thread by node id: "
                  id   " thread-id: "   Thread.currentThread().getId());
    }
    // 3.释放锁后取消刷新锁失效时间的调度任务
    if (opStatus) {
        cancelExpirationRenewal();
    }
  1. 使用 EVAL 命令执行 Lua 脚本来释放锁:

  2. key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1。

  3. key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil。

  4. 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。

  5. 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。

  6. 上面执行结果返回 nil 的情况(即第2中情况),因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。

  7. 执行结果返回 1 的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。

可以看到redission最终还是使用了lua脚本来加解锁 :

加锁脚本

 if (redis.call('exists' KEYS[1]) == 0) then     --  exists 判断key是否存在
                  redis.call('hset' KEYS[1] ARGV[2] 1);      --如果不存在,hset存哈希表
                  redis.call('pexpire' KEYS[1] ARGV[1]);    --设置过期时间
                  return nil;                               -- 返回null 就是加锁成功
              end;   
              if (redis.call('hexists' KEYS[1] ARGV[2]) == 1) then    -- 如果key存在,查看哈希表中是否存在
                  redis.call('hincrby' KEYS[1] ARGV[2] 1);    -- 给哈希中的key加1,代表重入1次,以此类推
                  redis.call('pexpire' KEYS[1] ARGV[1]);    -- 重设过期时间
                  return nil;   
              end;   
              return redis.call('pttl' KEYS[1]); --如果前面的if都没进去,说明ARGV2 的值不同,也就是不是同                  一线程的锁,这时候直接返回该锁的过期时间

推荐使用sciTE来编辑lua

图片 2

解锁的脚本就不分析了,还是操作的redis命令,主要是lua脚本执行的时候能保证原子性。

需要解决的问题

lua脚本的缺点

Redis的脚本执行是原子的,即脚本执行期间Redis不会执行其他命令。所有的命令都必须等待脚本执行完成后才能执行。为了防止某个脚本执行时间过长导致Redis无法提供服务(比如陷入死循环),Redis提供了lua-time-limit参数限制脚本的最长运行时间,默认为5秒钟。当脚本运行时间超过这一限制后,Redis将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误

 

一个lua死循环脚本

a = 0
while(a < 3) do
print("x = " .. '我是循环')
end

  

 

几个lua脚本示例

分布式的思路和线程同步锁ReentrantLock的思路是一样的。我们也要考虑如以下几个问题:

示例1——实现访问频率限制: 实现访问者 $ip 在一定的时间 time 内只能访问 limit 次

local key = "rate.limit:" .. KEYS[1]
local limit = tonumber(ARGV[1])
local expire_time = ARGV[2]



local is_exists = redis.call("EXISTS", key)
if is_exists == 1 then
    if redis.call("INCR", key) > limit then
        return '拒绝访问'
    else
        return '可以访问'
    end
else
    return redis.call("SET", key, "1","NX","PX",expire_time)
end
  • 死锁的情况。复杂的网络环境下,当加锁成功,后续操作正在处理时,获得锁的节点忽然宕机,无法释放锁的情况。如A在Node1 节点申请到了锁资源,但是Node1宕机,锁一直无法释放,订单没有生成,但是其他用户将无法申请到锁资源。
  • 锁的性能效率。分布式锁不能成为性能瓶颈或者单点故障不能导致业务异常。
  • 如果关键业务,可能需要重入场景,是否设计成可重入锁。这个可以参考下在多线程的情况下,比如ReentrantLock就是一种可重入锁,其内部又提供了公平锁和非公平锁两种实现和应用,本文不继续探讨。带着以上问题,和场景,沿着下文,来一一找到解决方案。

示例2 —— 抢红包

 

-- 脚本:尝试获得红包,如果成功,则返回json字符串,如果不成功,则返回空
 -- 参数:红包队列名, 已消费的队列名,去重的Map名,用户ID
 -- 返回值:nil 或者 json字符串,包含用户ID:userId,红包ID:id,红包金额:money
 --  jedis.eval(getScript(), 4, hongBaoList, hongBaoConsumedList, hongBaoConsumedMap, ""   j)
        if redis.call('hexists', KEYS[3], KEYS[4]) ~= 0 then
              return nil
              else
              -- 先取出一个小红包
              local hongBao = redis.call('rpop', KEYS[1])
              -- hongbao : {"Money":9,"Id":8}
                  if hongBao then
                      local x = cjson.decode(hongBao)
                      -- 加入用户ID信息
                      x['userId'] = KEYS[4]
                      local re = cjson.encode(x)
                      -- 把用户ID放到去重的set里
                      redis.call('hset', KEYS[3], KEYS[4], KEYS[4])
                      -- 把红包放到已消费队列里
                      redis.call('lpush', KEYS[2], re)
                  return re;
                  end
              end
          return nil

红包队列:hongBaoList -- keys1

已消费hash表 :hongBaoConsumedMap -- key2

已消费队列:hongBaoConsumedList -- key3

 

基于Redis实现 Redis 命令

SCRIPT LOAD 命令

redis 127.0.0.1:6379> SCRIPT LOAD "return 'hello moto'"
"232fd51614574cf0867b83d384a5e898cfd24e5a"

redis 127.0.0.1:6379> EVALSHA "232fd51614574cf0867b83d384a5e898cfd24e5a" 0
"hello moto"

  

目的:在脚本比较长的情况下,如果每次调用脚本都需要将整个脚本传给Redis会占用较多的带宽。为了解决这个问题,Redis提 供 了EVALSHA命令,允许开发者通过脚本内容的SHA1摘要来执行脚本

 

 

  • SCRIPT LOAD "lua-script"     获得 脚本的SHA1摘要 lua-script-sha1
  • SCRIPT EXISTS lua-script-sha1   可以判断脚本是否存在 存在-1, 不存在-0
  • SCRIPT FLUSH 清空 SHA1摘要    redis将脚本的SHA1摘要加入到脚本缓存后会永久保留,不会删除,但可以手动使用SCRIPT FLUSH命令情况脚本缓存
  • SCRIPT KILL 强制终止当前脚本

在Redisson介绍前,回顾下Redis的命令,以及不通过任何开源框架,可以基于redis怎么设计一个分布式锁。基于不同应用系统实现的语言,也可以通过其他一些如Jedis,或者Spring的RedisOperations 等,来执行Reids命令Redis command list

Java中使用 SCRIPT LOAD

public void scriptLoad()throws Exception{
        Jedis jedis = new Jedis("localhost");
        //从文件读取lua脚本
        InputStream input = new FileInputStream("return.lua");
        byte[] by = new byte[input.available()];
        input.read(by);
        byte[] scriptBy = jedis.scriptLoad(by);
        String sha1 = new String(scriptBy);
        //直接解析
        String sha2 = jedis.scriptLoad("local key1 = KEYS[1]n"
              "local key2 = KEYS[2]n"
              "local argv1 = ARGV[1]n"
              "return "key1:"..key1 .." key2:"..key2.. " argv1:"..argv1");
        System.out.println("sha1 : "   sha1);
        System.out.println("sha2 : "   sha2);
        Object obj = jedis.evalsha(sha1, Arrays.asList("value1","value2"), Arrays.asList("value3"));
        System.out.println("执行结果: "  obj);
    }

 

 

~~~~~~~~~~~~~~~

主要内容是以上这些,小弟不才,写的不好,如果大家发现由bug,一定@我 

20:49:54

最后是自己学习lua的一些笔记,含金量不高

 

分布式锁主要需要以下redis命令,这里列举一下。在实现部分可以继续参照命令的操作含义。

lua简介

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 提供了交互式编程模式。我们可以在命令行中输入程序并立即查看效果。

 

print("Hello lua!")

 

Hello lua!

 

单行注释 -- 多行注释 --[[ 注释 注释 --]]

 

  1. SETNX key value (SET if Not eXists):当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。详见:SETNX commond
  2. GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。详见:GETSET commond
  3. GET key:返回 key 所关联的字符串值,如果 key 不存在那么返回 nil 。详见:GET Commond
  4. DEL key [KEY …]:删除给定的一个或多个 key ,不存在的 key 会被忽略,返回实际删除的key的个数(integer)。详见:DEL Commond
  5. HSET key field value:给一个key 设置一个{field=value}的组合值,如果key没有就直接赋值并返回1,如果field已有,那么就更新value的值,并返回0.详见:HSET Commond
  6. HEXISTS key field:当key 中存储着field的时候返回1,如果key或者field至少有一个不存在返回0。详见HEXISTS Commond
  7. HINCRBY key field increment:将存储在 key 中的哈希(Hash)对象中的指定字段 field 的值加上增量 increment。如果键 key 不存在,一个保存了哈希对象的新建将被创建。如果字段 field 不存在,在进行当前操作前,其将被创建,且对应的值被置为 0。返回值是增量之后的值。详见:HINCRBY Commond
  8. PEXPIRE key milliseconds:设置存活时间,单位是毫秒。expire操作单位是秒。详见:PEXPIRE Commond
  9. PUBLISH channel message:向channel post一个message内容的消息,返回接收消息的客户端数。详见PUBLISH Commond

标示符

Lua 标示符用于定义一个变量,函数获取其他用户定义的项。标示符以一个字母 A 到 Z 或 a 到 z 或下划线 _ 开头后加上0个或多个字母,下划线,数字

Lua 不允许使用特殊字符如 @, $, 和 % 来定义标示符。 Lua 是一个区分大小写的编程语言。

Redis 实现分布式锁

关键词

以下列出了 Lua 的保留关键字。保留关键字不能作为常量或变量或其他用户自定义标示符:

 

and break do else elseif end false for function if in local nil not or repeat return then true until while 一般约定,以下划线开头连接一串大写字母的名字(比如 _VERSION)被保留用于 Lua 内部全局变量。

 

 

假设我们现在要给itemId 1234 和下单操作 OP_ORDER 加锁,key是OP_ORDER_1234,结合上面的redis命令,似乎加锁的时候只要一个SETNX OP_ORDER_1234 currentTimestamp,如果返回1代表加锁成功,返回0 表示锁被占用着。然后再用DEL OP_ORDER_1234解锁,返回1表示解锁成功,0表示已经被解锁过。然而却还存在着很多问题:SETNX会存在锁竞争,如果在执行过程中客户端宕机,也会引起死锁问题,即锁资源无法释放。并且当一个资源解锁的时候,释放锁之后,其他之前等待的锁没有办法再次自动重试申请锁(除非重新申请锁)。解决死锁的问题其实可以可以向Mysql的死锁检测学习,设置一个失效时间,通过key的时间戳来判断是否需要强制解锁。但是强制解锁也存在问题,一个就是时间差问题,不同的机器的本地时间可能也存在时间差,在很小事务粒度的高并发场景下还是会存在问题,比如删除锁的时候,在判断时间戳已经超过时效,有可能删除了其他已经获取锁的客户端的锁。另外,如果设置了一个超时时间,但是确实执行时间超过了超时时间,那么锁会被自动释放,原来持锁的客户端再次解锁的时候会出现问题,而且最为严重的还是一致性没有得到保障。

lua数据类型

 

Lua是动态类型语言,变量不要类型定义,只需要为变量赋值,8个基本类型

nil 这个最简单,只有值nil属于该类,表示一个无效值 boolean 包含两个值:false和true。 number 表示双精度类型的实浮点数 string 字符串由一对双引号或单引号来表示 function 由 C 或 Lua 编写的函数 userdata 表示任意存储在变量中的C数据结构 thread 表示执行的独立线路,用于执行协同程序 table Lua 中的表(table)其实是一个"关联数组"(associative arrays),数组的索引可以是数字或者是字符串。在 Lua 里,table 的创建是通过"构造表达式"来完成,最简单构造表达式是{},用来创建一个空表。

 

可以使用tpye() 获得类型。

 

变量不需要声明

print(b)
nil

b=10
print(b)
10

 

Lua 中的变量全是全局变量,那怕是语句块或是函数里,除非用 local 显式声明为局部变量。

a = 5               -- 全局变量

local b = 5         -- 局部变量

 

所以设计的时候需要考虑以下几点:

lua数据类型的自动转换

运行时,Lua会自动在string和numbers之间自动进行类型转换,当一个字符串使用算术操作符时, string 就会被转成数字。

print("10" 1)     --> 11

print("a" 1)    --> error

 

##function
 function fun1(n)
    if n == 0 then
        return 1
    else
        return n * fun1(n - 1)
    end
 end
 print(fun1(3))
 fun2 = fun1
 print(fun2(4))

 

 

  1. 锁的时效设置。避免单点故障造成死锁,影响其他客户端获取锁。但是也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。(网上很多解决方案都是其他客户端等待队列长度判断是否强制解锁,但其实在偶发情况下就不能保证一致性,也就失去了分布式锁的意义)。
  2. 持锁期间的check,尽量在关键节点检查锁的状态,所以要设计成可重入锁,但在客户端使用时要做好吞吐量的权衡。
  3. 减少获取锁的操作,尽量减少redis压力。所以需要让客户端的申请锁有一个等待时间,而不是所有申请锁的请求要循环申请锁。
  4. 加锁的事务或者操作尽量粒度小,减少其他客户端申请锁的等待时间,提高处理效率和并发性。
  5. 持锁的客户端解锁后,要能通知到其他等待锁的节点,否则其他节点只能一直等待一个预计的时间再触发申请锁。类似线程的notifyAll,要能同步锁状态给其他客户端,并且是分布式消息。
  6. 考虑任何执行句柄中可能出现的异常,状态的正确流转和处理。比如,不能因为一个节点解锁失败,或者锁查询失败(redis 超时或者其他运行时异常),影响整个等待的任务队列,或者任务池。

table 与Java不同,Lua是从1开始排序

 

tab = {"Hello","World","hello","lua"}
for k,v in pairs(tab) do
print(k.." "..v)
end

 

##for循环
for i=1,10
do print(i)
end

 

锁设计

if判断

Lua中 0 为 true , nil

 

--[ 0 为 true ]
if(0)
then
print("0 为 true")
else
print("0 为 false")
end


if(null)
then
print("nil 为 true")
else
print("nil 为 false")
end

由于时间戳的设计有很多问题,以及上述几个问题,所以再换一种思路。先回顾几个关于锁的概念和经典java API。通过一些java.util.concurrent的API来处理一些本地队列的同步以及等待信号量的处理。

  • Semaphore :Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。其内部维护了一个int 类型的permits。有一个关于厕所的比喻很贴切,10个人在厕所外面排队,厕所有5个坑,只能最多进去五个人,那么就是初始化一个 permits=5的Semaphore。当一个人出来,会release一个坑位,其他等坑的人会被唤醒然后开始要有人进坑。Semaphore同ReentrantLock一样都是基于AbstractQueuedSynchronizer提供了公平锁和非公平锁两种实现。如果等待的人有秩序的排队等着,就说明选择了Semaphore的公平锁实现,如果外面的人没有秩序,谁抢到是谁的(活跃线程就会一直有机会,存在线程饥饿可能),那就是Semaphore的非公平锁实现。无论外面人怎么个等法Semaphore对于出坑的控制是一致的,每次只能是从一个坑里出来一个人。理解起来,其实就是厕所的5个坑位是一个共享资源,也就是permits的值=5,每次acquire一下就是外面来了个人排队,每次release一下就是里面出来个人。厕所聊多有点不雅观,再回归到分布式锁的话题。在刚才讲述的redis实现分布式锁的“第三点”,减少redis申请锁调用频率上就可以通过Semaphore来控制请求。虽然Semaphore只是虚拟机内部的锁粒度的实现(不能跨进程),但是也可以一定程度减轻最后请求redis节点的压力。当然,也有种方法是,随机sleep一段时间再去tryLock之类的,也可以达到减轻最后redis节点压力,但是毕竟使用信号量能更好得控制。而且我们可以再简单点,对于同一个锁对象的申请锁操作,可以设计一个初始化permits = 0的LockEntry,permits = 0也就顾名思义,谁都进不来,厕所维修中。当有一个持锁对象unlock的时候,通过分布式消息机制通知所有等待节点,这时候,再release,这时候permits=1,也就是本虚拟机中只能有一个线程能在acquire()的阻塞中脱颖而出(当然只是进了坑,但不一定能获取得到分布式锁)。
  • ConcurrentHashMap:这个应该不必多说,之谈谈在设计分布式锁中的用途。在上述的“第一点”,对于锁的时效性的设置里提到了,要在持锁线程正常运行(持锁节点没有宕机或内部异常)的时候,保证其一直占用锁。只要占着茅坑的人还在用着,只要他还没有暴毙或者无聊占着茅坑不XX,那就应该让外面的人都等着,不能强行开门托人。再收回来。。。这里ConcurrentHashMap的key无疑是锁对象的标识(我们需要设计的redis的key),value就是一个时间任务对象,比如可以netty的TimerTask或其他定时API,定时得触发给我的锁重新设置延时。这就是好比(好吧,再次用厕所比喻),蹲在里面的人的一种主动行为,隔1分钟敲两下厕所门,让外面的等的人知道,里面的人正在使用中,如果里面的人1分钟超过还没有敲门,可能是里面人挂掉了,那么再采取强制措施,直接开门拽人,释放坑位。

并发API以及一些框架的使用主要是控制锁的进入和调度,加锁的流程以及锁的逻辑也是非常重要。因为redis支持hash结构,除了key作为锁的标识,还可以利用value的结构

加锁

下面参数的含义先说明下 :

  • KEYS[1] :需要加锁的key,这里需要是字符串类型。
  • ARGV[1] :锁的超时时间,防止死锁
  • ARGV[2] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) “:” threadId

图片 3

以上的方法,当返回空是,说明获取到锁,如果返回一个long数值(pttl 命令的返回值),说明锁已被占用,通过返回剩余时间,外部可以做一些等待时间的判断和调整。

解锁

也还是先说明一下参数信息:

- KEYS[1] :需要加锁的key,这里需要是字符串类型。

- KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{”

  • getName() “}”

- ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。

- ARGV[2] :锁的超时时间,防止死锁

- ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) “:” threadId

图片 4

这就是解锁过程,当然建议提供强制解锁的接口,直接删除key,以防一些紧急故障出现的时候,关键业务节点受到影响。这里还有一个关键点,就是publish命令,通过在锁的唯一通道发布解锁消息,可以减少其他分布式节点的等待或者空转,整体上能提高加锁效率。至于redis的消息订阅可以有多种方式,基于Jedis的订阅API或者Spring的MessageListener都可以实现订阅,这里就可以结合刚才说的Semaphore,在第一次申请锁失败后acquire,接收到分布式消息后release就可以控制申请锁流程的再次进入。下面结合Redisson源码,相信会有更清晰的认识。

使用Redisson示例

Redisson使用起来很方便,但是需要redis环境支持eval命令,否则一切都是悲剧,比如me.结果还是要用RedisCommands去写一套。例子就如下,获得一个RLock锁对象,然后tryLock 和unlock。trylock方法提供了锁重入的实现,并且客户端一旦持有锁,就会在能正常运行期间一直持有锁,直到主动unlock或者节点故障,主动失效(超过默认的过期时间)释放锁。

图片 5

Redisson还提供了设置最长等待时间以及设置释放锁时间的含参tryLock接口 boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 。Redisson的lock 扩展了java.util.concurrent.locks.Lock的实现,也基本按照了Lock接口的实现方案。lock()方法会一直阻塞申请锁资源,直到有可用的锁释放。下面一部分会详细解析一部分关键实现的代码。

Redisson源码解析

Redisson 的异步任务(Future,Promise,FutureListener API),任务计时器(Timeout,TimerTask),以及通过AbstractChannel连接redis以及写入执行批处理命令等很多都是基于netty框架的。po主因为不能使用eval,所以用Spring提供的redisApi ,RedisOperations来处理redis指令,异步调度等用了Spring的AsyncResult,MessageListener以及一些concurrent api。这里还是先看一下Redisson的实现。

trylock

这里以带参数的trylock解析一下,无参的trylock是一种默认参数的实现。先源码走读一下。

图片 6

图片 7

上述方法,调用加锁的逻辑就是在tryAcquire(long leaseTime, TimeUnit unit)中

图片 8

tryAcquire(long leaseTime, TimeUnit unit)只是针对leaseTime的不同参数进行不同的转发处理,再提一下,trylock的无参方法就是直接调用了get(tryLockInnerAsync(Thread.currentThread().getId()));

所以下面再看核心的tryLockInnerAsync基本命令已经在之前解析过,相信这里看起来应该比较轻松,返回的是一个future对象,是为了异步处理IO,提高系统吞吐量。

图片 9

再说明一下,tryLock(long waitTime, long leaseTime, TimeUnit unit)有leaseTime参数的申请锁方法是会按照leaseTime时间来自动释放锁的。但是没有leaseTime参数的,比如tryLock()或者tryLock(long waitTime, TimeUnit unit)以及lock()是会一直持有锁的。再来看一下没有leaseTime参数的tryLockInnerAsync(Thread.currentThread().getId())

图片 10

这里比有leaseTime参数的trylock就多了异步scheduleExpirationRenewal调度。可以继续看一下,这里的expirationRenewalMap就是之前降到的一个ConcurrentMap结构。下面的这个调度方式很精妙。除非被unlock的cancleTask方法触发,否则会一直循环重置过期时间。

图片 11

这个任务,其实还有一个问题,个人觉得在expirationRenewalMap.containsKey判断时也加上isLocked判断会比较好,以防止unlock时出现redis节点异常的时候,任务没有办法自动停止,或者设置一个最大执行次数的限制也可以,否则极端情况下也会耗尽本地节点的CPU资源。

unlock

解锁的逻辑相对简单,如下,redis 命令相信看起来也会比较轻松了。

图片 12

这里的 cancelExpirationRenewal对应着取消 scheduleExpirationRenewal的重置expire时间任务。

图片 13

再看一下Redisson是如何处理unlock的redis消息的。这里的消息内容就是unlockMessage = 0L和unlock方法中publish的内容是对应的。

图片 14

Redisson还支持Redis的多种集群配置,一主一备,一主多备,单机等等。也是通过netty的EventExecutorGroup,Promise,Future等API实现调度的。

结语

在思考是否采用分布式锁以及采用哪种实现方案的时候,还是要基于业务,技术方案一定是基于业务基础,服务于业务,并且衡量过投入产出比的。所以如果有成熟的解决方案,在业务可承受规模肯定是不要重复造轮子,当然还要经过严谨的测试。在po主用Spring的redis api实现时,也遇到了一些问题。

比如hIncrBy 的字符集问题,在使用命令的时候,当然可以直接set a 1然后incr a 1,这个问题可以参考ERR value is not an integer or out of range 问题,但在使用RedisConnection的时候,需要通过转码,byte[] value =SafeEncoder.encode(String.valueOf(“1”)) 再 connection.hSet(key, field, value)这样才可以,或者自己通过String转成正确的编码也可以。

还有刚才说的调度pexpire任务,在unlock异常的时候,任务池中的任务无法自动结束。另外就是Spring的MessageListener的onMessage(Message message, byte[] pattern)回调方法message.getBody()是byte数组,消息内容转化的时候要处理一下。

资源 返回搜狐,查看更多

  • ERR value is not an integer or out of range 问题
  • Redis 命令查询
  • Redisson github

责任编辑:

编辑:www.3559.com 本文来源:Redisson使用及源码分析,Redis分布式锁

关键词: www.3559.com