博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ReentrantReadWriteLock使用及源码分析
阅读量:4149 次
发布时间:2019-05-25

本文共 12391 字,大约阅读时间需要 41 分钟。

为什么使用ReentrantReadWriteLock?

    在很多场景下我们可以使用synchronized关键字对一个方法或者一段代码进行加锁操作,或者也可以使用ReentrantLock来对某段代码加锁,但是这些锁,如果想要对一个共有资源进行查询操作,也使用以上两种锁,那性能就会有影响,在实际开发环境当中,我们更多的是读的多,写的少,读取数据之间并不需要加锁,而写数据的时候需要同一时刻只能有一个线程在执行该操作。因此,在这种场景下,我们就需要对读写进行一个分离,ReentrantReadWriteLock就是解决这类问题。允许多个读线程获取readLock,只能有一个线程获取writeLock

什么情况下会进入读锁?

    1、没有其他线程的写锁

    2、没有写请求或者调用写请求的线程和持有锁的线程是同一个线程

什么情况下进入写锁?

    1、没有其他线程的写锁

    2、没有其他线程的读锁

综合来看就是,如果有多个线程去读取某个公共资源是可以的,同一时刻只能有一个线程对公共资源进行修改,如果有线程正在进行资源的修改,那么不能对资源进行读取,

读-读 不互斥

写-写 互斥

读-写 互斥

java doc当中给出的两个使用实例:

examle1: 关于锁的降级

class CachedData {    Object data;    volatile boolean cacheValid;    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();     void processCachedData() {      rwl.readLock().lock();      if (!cacheValid) {        // 在获取写锁之前必须先释放掉读锁        rwl.readLock().unlock();        rwl.writeLock().lock();        try {          // 再次检查cacheValid 的状态,防止被其他线程所修改          if (!cacheValid) {            data = ...            cacheValid = true;          }          // 在释放掉写锁之前,我们可以把写锁降级成读取锁          rwl.readLock().lock();        } finally {          rwl.writeLock().unlock(); // 最后释放掉写入锁,但是这个时候还是持有读取锁        }      }       try {        use(data);      } finally {        rwl.readLock().unlock(); //最后再释放读取锁      }    }  }

example2:

class RWDictionary {    private final Map
m = new TreeMap
(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }
这个例子中,将对于map的操作进行了细分,如果是需要读取的操作,那么都加上读取锁,如果是修改的操作,都加上写入锁。

读写互斥的例子:

public class ReadWriteLockTest {    public static void main(String[] args) {        final Queue3 queue3 = new Queue3();        for (int i = 0; i < 3; i++) {            new Thread(() -> {                while (true) {                    queue3.get();                }            }).start();        }        for (int i = 0; i < 3; i++) {            new Thread(() -> {                while (true) {                    queue3.put(new Random().nextInt(1000));                }            }).start();        }    }}class Queue3 {    private Object data = null;    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();    public void get() {        lock.readLock().lock();        System.out.println(Thread.currentThread().getName() + " read");        try {            Thread.sleep((long) (Math.random() * 1000));        } catch (Exception e) {            e.printStackTrace();        }        System.out.println(Thread.currentThread().getName() + " read :" + data);        lock.readLock().unlock();    }    public void put(Object data) {        lock.writeLock().lock();        System.out.println(Thread.currentThread().getName() + "  write");        try {            Thread.sleep((long) (Math.random() * 1000));        } catch (Exception e) {            e.printStackTrace();        }        this.data = data;        System.out.println(Thread.currentThread().getName() + "  write : " + data);        lock.writeLock().unlock();    }}

执行结果:

Thread-4  write : 902Thread-5  writeThread-5  write : 888Thread-3  writeThread-3  write : 296Thread-2 readThread-0 readThread-1 readThread-2 read :296Thread-0 read :296Thread-1 read :296Thread-4  writeThread-4  write : 456Thread-4  writeThread-4  write : 599

这个例子可以看出,某个时刻,只能有一个线程对数据进行修改,在读取时可以有多个线程同时去读取数据

接下来我们进行ReentrantReadWriteLock的源码分析阶段:

首选关于这个类的内部结构我们先了解下:

ReentrantReadWriteLock类内部维护了一个内部类及继承关系:

关于Sync类,其内部维护的字段和方法简单介绍如下,

在介绍ReentrantLock时候,这个锁是独享锁,继承自AQS, 通过state的值来判断是否被线程所持有,如果state的值表示,持有线程的访问次数,那么关于读写锁的实现方式,同样是使用state,不同的是state的高16位,表示读锁持有的线程计数,低16位表示写锁的线程计数

abstract static class Sync{ // 位移单位 static final int SHARED_SHIFT   = 16; // 读锁单位 static final int SHARED_UNIT    = (1 << SHARED_SHIFT); //最大线程数 static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; //写锁的最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 读取锁的计数量 static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }  // 写锁的计数量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //可重入的读锁的数量,当前线程持有,当一个线程的读取计数器下降到0时,删除它 private transient ThreadLocalHoldCounter readHolds; // 成功获取最后一个readLock的线程的计数器 private transient HoldCounter cachedHoldCounter; // 第一个持有读取锁的线程,即将state从0变成1 的线程, private transient Thread firstReader = null; //第一个持有读取锁的线程的计数 private transient int firstReaderHoldCount;}

Sync的两个静态内部类

HoldCounter 每个线程重入次数

static final class HoldCounter {            int count = 0;            // 使用线程id,避免相互引用导致垃圾保留            final long tid = getThreadId(Thread.currentThread());        }

ThreadLocal 的实现类,

static final class ThreadLocalHoldCounter            extends ThreadLocal
{ public HoldCounter initialValue() { return new HoldCounter(); } }

关于读写状态的持有计数设计

同步状态表示重入锁被一个线程重复获取的次数,在之前的独享锁中,只是来记录当前线程进入的次数,并没有进行读写锁的区分,在读写锁当中,使用state的高16位保存读取锁的计数,低16位保存写入锁的计数

1、获取写的状态

    将高16位清空,计算低16的数值

2、写状态加 1

    state + 1

3、获取读的状态

    高16位右移16位,之前的高16位用0补足,计算读取计数

4、读状态加1

    state + (1 <<16)

写入锁的获取和释放,写锁的实现是独享锁,因为只需要重写tryAcquire()和tryRelease()方法

protected final boolean tryAcquire(int acquires) {                   Thread current = Thread.currentThread();            int c = getState();  //获取AQS当中状态            int w = exclusiveCount(c); // 获取写入状态            if (c != 0) {  //AQS 状态不等于0,说明已经加锁了// 写入状态等于0,说明当前持有锁是读取锁,返回false, 如果写入状态不等于0,但是持有线程和当前线程不是同一个线程直接返回                if (w == 0 || current != getExclusiveOwnerThread())                    return false;                // 如果写入计数 超过写入锁持有的最大次数,直接抛出异常                if (w + exclusiveCount(acquires) > MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                // Reentrant acquire           // 写入锁直接将状态加1     setState(c + acquires);                return true;            }  // c ==0 说明当前没有被任何线程所持有,判断是否需要阻塞,如果不需要阻塞,并且将state的值更新成功,将当前持有线程设置成当前线程            if (writerShouldBlock() ||                !compareAndSetState(c, c + acquires))                return false;            setExclusiveOwnerThread(current);            return true;        }

关于写操作,获取锁相对来说比较简单,接下来来看释放写操作的锁

protected final boolean tryRelease(int releases) {        //先判断当前线程是否和持有线程一致,如果不一致直接抛出异常,因为写入锁是独享锁            if (!isHeldExclusively())                throw new IllegalMonitorStateException();        // 获取到state,并减一            int nextc = getState() - releases;        // 减一之后,获取到写入的计数器,如果等于0,说明写入锁直接释放掉,并将持有线程清空            boolean free = exclusiveCount(nextc) == 0;            if (free)                setExclusiveOwnerThread(null);            setState(nextc);            return free;        }

接下来关于读取锁的获取和释放,读取锁是共享锁,因此根据之前对AQS的了解,需要实现tryAccquireShared和tryReleaseShared方法

protected final int tryAcquireShared(int unused) {                   Thread current = Thread.currentThread();            int c = getState();    //获取到写入计数,如果写入计数不等于0 ,说明当前是被写入锁持有,并且尝试读取的线程和写入锁持有的线程不是同一个线程直接返回            if (exclusiveCount(c) != 0 &&                getExclusiveOwnerThread() != current)                return -1;    //获取读取锁的计数,即高16位的计数值            int r = sharedCount(c);        // 如果非阻塞读取,并且读取计数小于可以读取的线程最大数,更新读取计数,将其加1            if (!readerShouldBlock() &&                r < MAX_COUNT &&                compareAndSetState(c, c + SHARED_UNIT)) {                if (r == 0) { // 如果读取计数等于0,表示没有读取线程持有锁                                       firstReader = current; //将第一个读取线程设置成当前线程                    firstReaderHoldCount = 1; //将读取计数加1                            } else if (firstReader == current) { //如果当前线程就是第一个获取到读取锁的线程,将读取计数加1                    firstReaderHoldCount++;             } else {                    // 获取线程计数器(包含线程id和计数)                    HoldCounter rh = cachedHoldCounter;                    // 如果计数器为null,或者计数器中的线程id和当前线程id不一致                    if (rh == null || rh.tid != getThreadId(current))                        cachedHoldCounter = rh = readHolds.get(); //获取到当前线程的计数器                    else if (rh.count == 0) //如果计数器中的count值为0                        readHolds.set(rh); //将当前线程计数器添加进去                    rh.count++; //将线程计数器加一                }                return 1;            }            // 如果读取线程需要阻塞,或者读取线程超过最大值,或者更新失败,就执行下面这个方法            return fullTryAcquireShared(current);        }
final int fullTryAcquireShared(Thread current) {                       HoldCounter rh = null;            for (;;) {                int c = getState();                if (exclusiveCount(c) != 0) {                    if (getExclusiveOwnerThread() != current)                        return -1;                } else if (readerShouldBlock()) {                    // 如果是需要阻塞访问的话                    if (firstReader == current) {                        // assert firstReaderHoldCount > 0;                    } else {                        if (rh == null) {                            rh = cachedHoldCounter;                            if (rh == null || rh.tid != getThreadId(current)) {                                rh = readHolds.get();                                if (rh.count == 0) //如果当前线程的计数等于0 ,将当前线程从readHolds中移除                                    readHolds.remove();                            }                        }                        if (rh.count == 0)                            return -1;                    }                }                if (sharedCount(c) == MAX_COUNT)                    throw new Error("Maximum lock count exceeded");                if (compareAndSetState(c, c + SHARED_UNIT)) {                    if (sharedCount(c) == 0) {                        firstReader = current;                        firstReaderHoldCount = 1;                    } else if (firstReader == current) {                        firstReaderHoldCount++;                    } else {                        if (rh == null)                            rh = cachedHoldCounter;                        if (rh == null || rh.tid != getThreadId(current))                            rh = readHolds.get();                        else if (rh.count == 0)                            readHolds.set(rh);                        rh.count++;                        cachedHoldCounter = rh; // cache for release                    }                    return 1;                }            }        }

上面这个逻辑基本上和尝试去获取读取锁的逻辑基本相同

接下来看下读取锁的释放

protected final boolean tryReleaseShared(int unused) {            Thread current = Thread.currentThread();            if (firstReader == current) { //判断当前线程是否为第一个获取到读取锁线程                // assert firstReaderHoldCount > 0;                if (firstReaderHoldCount == 1)  如果读取锁计数等于1 将当前读取锁置为null                    firstReader = null;                else                    firstReaderHoldCount--; //否则将读取锁计数减一            } else {                HoldCounter rh = cachedHoldCounter; //获取缓存计数                if (rh == null || rh.tid != getThreadId(current)) //如果当前缓存计数为null或者缓存计数器线程和当前线程不一致                    rh = readHolds.get(); //获取到当前线程的缓存计数                int count = rh.count;  //当前线程计数值                if (count <= 1) {  //小于等于1 直接从缓存计数中移除                    readHolds.remove();                    if (count <= 0) //如果小于0 直接抛出异常                        throw unmatchedUnlockException();                 }                --rh.count; //最终将当前线程的缓存计数减一            }            for (;;) {                int c = getState(); // 获取到状态结果                int nextc = c - SHARED_UNIT; //将读取计数减一                if (compareAndSetState(c, nextc)) //更新读取计数的值                    // Releasing the read lock has no effect on readers,                    // but it may allow waiting writers to proceed if                    // both read and write locks are now free.                    return nextc == 0; //判断释放读取锁之后,是否还持有锁            }        }

总结:

关于ReentrantReadWriteLock源码分析部分,首先需要弄清楚的就是读写状态位的分离,高16位和低16位分别是读锁持有计数和写锁持有计数,其次关于读取锁的状态计数,因为是共享锁,因此就需要为每个读取线程单独记录一份计数,这里就是需要使用到ThreadLocal ,来保证每个线程拥有一个计数

关于线程读写锁中锁的降级,当持有为写锁时,可以降级到读锁,这个比较好理解,因为写锁只有一个线程持有,进行降低后,读取锁可以被多个线程持有,而读取锁想要升级成为写入锁,是不可以的,因为读取锁是共享锁,写入锁时独享锁,已经被读取锁持有的多个线程,是没有办法升级到写入锁的。

    以上内容如有错误,欢迎指正~

参考:

java doc 文档

你可能感兴趣的文章
数据库
查看>>
nginx反代 499 502 bad gateway 和timeout
查看>>
linux虚拟机安装tar.gz版jdk步骤详解
查看>>
python实现100以内自然数之和,偶数之和
查看>>
去哪儿一面+平安科技二面+hr面+贝贝一面+二面产品面经
查看>>
pytorch
查看>>
pytorch(三)
查看>>
C++ 调用json
查看>>
动态库调动态库
查看>>
Kubernetes集群搭建之CNI-Flanneld部署篇
查看>>
k8s web终端连接工具
查看>>
手绘VS码绘(一):静态图绘制(码绘使用P5.js)
查看>>
手绘VS码绘(二):动态图绘制(码绘使用Processing)
查看>>
基于P5.js的“绘画系统”
查看>>
《达芬奇的人生密码》观后感
查看>>
论文翻译:《一个包容性设计的具体例子:聋人导向可访问性》
查看>>
基于“分形”编写的交互应用
查看>>
《融入动画技术的交互应用》主题博文推荐
查看>>
链睿和家乐福合作推出下一代零售业隐私保护技术
查看>>
Unifrax宣布新建SiFAB™生产线
查看>>