本文共 12391 字,大约阅读时间需要 41 分钟。
为什么使用ReentrantReadWriteLock?
在很多场景下我们可以使用synchronized关键字对一个方法或者一段代码进行加锁操作,或者也可以使用ReentrantLock来对某段代码加锁,但是这些锁,如果想要对一个共有资源进行查询操作,也使用以上两种锁,那性能就会有影响,在实际开发环境当中,我们更多的是读的多,写的少,读取数据之间并不需要加锁,而写数据的时候需要同一时刻只能有一个线程在执行该操作。因此,在这种场景下,我们就需要对读写进行一个分离,ReentrantReadWriteLock就是解决这类问题。允许多个读线程获取readLock,只能有一个线程获取writeLock
什么情况下会进入读锁?
1、没有其他线程的写锁
2、没有写请求或者调用写请求的线程和持有锁的线程是同一个线程
什么情况下进入写锁?
1、没有其他线程的写锁
2、没有其他线程的读锁
综合来看就是,如果有多个线程去读取某个公共资源是可以的,同一时刻只能有一个线程对公共资源进行修改,如果有线程正在进行资源的修改,那么不能对资源进行读取,
读-读 不互斥
写-写 互斥
读-写 互斥
java doc当中给出的两个使用实例:
examle1: 关于锁的降级
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 在获取写锁之前必须先释放掉读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // 再次检查cacheValid 的状态,防止被其他线程所修改 if (!cacheValid) { data = ... cacheValid = true; } // 在释放掉写锁之前,我们可以把写锁降级成读取锁 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // 最后释放掉写入锁,但是这个时候还是持有读取锁 } } try { use(data); } finally { rwl.readLock().unlock(); //最后再释放读取锁 } } }
example2:
class RWDictionary { private final Map这个例子中,将对于map的操作进行了细分,如果是需要读取的操作,那么都加上读取锁,如果是修改的操作,都加上写入锁。m = new TreeMap (); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Data get(String key) { r.lock(); try { return m.get(key); } finally { r.unlock(); } } public String[] allKeys() { r.lock(); try { return m.keySet().toArray(); } finally { r.unlock(); } } public Data put(String key, Data value) { w.lock(); try { return m.put(key, value); } finally { w.unlock(); } } public void clear() { w.lock(); try { m.clear(); } finally { w.unlock(); } } }
读写互斥的例子:
public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 queue3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread(() -> { while (true) { queue3.get(); } }).start(); } for (int i = 0; i < 3; i++) { new Thread(() -> { while (true) { queue3.put(new Random().nextInt(1000)); } }).start(); } }}class Queue3 { private Object data = null; private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public void get() { lock.readLock().lock(); System.out.println(Thread.currentThread().getName() + " read"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " read :" + data); lock.readLock().unlock(); } public void put(Object data) { lock.writeLock().lock(); System.out.println(Thread.currentThread().getName() + " write"); try { Thread.sleep((long) (Math.random() * 1000)); } catch (Exception e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " write : " + data); lock.writeLock().unlock(); }}
执行结果:
Thread-4 write : 902Thread-5 writeThread-5 write : 888Thread-3 writeThread-3 write : 296Thread-2 readThread-0 readThread-1 readThread-2 read :296Thread-0 read :296Thread-1 read :296Thread-4 writeThread-4 write : 456Thread-4 writeThread-4 write : 599
这个例子可以看出,某个时刻,只能有一个线程对数据进行修改,在读取时可以有多个线程同时去读取数据
接下来我们进行ReentrantReadWriteLock的源码分析阶段:
首选关于这个类的内部结构我们先了解下:
ReentrantReadWriteLock类内部维护了一个内部类及继承关系:
关于Sync类,其内部维护的字段和方法简单介绍如下,
在介绍ReentrantLock时候,这个锁是独享锁,继承自AQS, 通过state的值来判断是否被线程所持有,如果state的值表示,持有线程的访问次数,那么关于读写锁的实现方式,同样是使用state,不同的是state的高16位,表示读锁持有的线程计数,低16位表示写锁的线程计数
abstract static class Sync{ // 位移单位 static final int SHARED_SHIFT = 16; // 读锁单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //最大线程数 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //写锁的最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 读取锁的计数量 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 写锁的计数量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } //可重入的读锁的数量,当前线程持有,当一个线程的读取计数器下降到0时,删除它 private transient ThreadLocalHoldCounter readHolds; // 成功获取最后一个readLock的线程的计数器 private transient HoldCounter cachedHoldCounter; // 第一个持有读取锁的线程,即将state从0变成1 的线程, private transient Thread firstReader = null; //第一个持有读取锁的线程的计数 private transient int firstReaderHoldCount;}
Sync的两个静态内部类
HoldCounter 每个线程重入次数
static final class HoldCounter { int count = 0; // 使用线程id,避免相互引用导致垃圾保留 final long tid = getThreadId(Thread.currentThread()); }
ThreadLocal 的实现类,
static final class ThreadLocalHoldCounter extends ThreadLocal{ public HoldCounter initialValue() { return new HoldCounter(); } }
关于读写状态的持有计数设计
同步状态表示重入锁被一个线程重复获取的次数,在之前的独享锁中,只是来记录当前线程进入的次数,并没有进行读写锁的区分,在读写锁当中,使用state的高16位保存读取锁的计数,低16位保存写入锁的计数
1、获取写的状态
将高16位清空,计算低16的数值
2、写状态加 1
state + 1
3、获取读的状态
高16位右移16位,之前的高16位用0补足,计算读取计数
4、读状态加1
state + (1 <<16)
写入锁的获取和释放,写锁的实现是独享锁,因为只需要重写tryAcquire()和tryRelease()方法
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //获取AQS当中状态 int w = exclusiveCount(c); // 获取写入状态 if (c != 0) { //AQS 状态不等于0,说明已经加锁了// 写入状态等于0,说明当前持有锁是读取锁,返回false, 如果写入状态不等于0,但是持有线程和当前线程不是同一个线程直接返回 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果写入计数 超过写入锁持有的最大次数,直接抛出异常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 写入锁直接将状态加1 setState(c + acquires); return true; } // c ==0 说明当前没有被任何线程所持有,判断是否需要阻塞,如果不需要阻塞,并且将state的值更新成功,将当前持有线程设置成当前线程 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
关于写操作,获取锁相对来说比较简单,接下来来看释放写操作的锁
protected final boolean tryRelease(int releases) { //先判断当前线程是否和持有线程一致,如果不一致直接抛出异常,因为写入锁是独享锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取到state,并减一 int nextc = getState() - releases; // 减一之后,获取到写入的计数器,如果等于0,说明写入锁直接释放掉,并将持有线程清空 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
接下来关于读取锁的获取和释放,读取锁是共享锁,因此根据之前对AQS的了解,需要实现tryAccquireShared和tryReleaseShared方法
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //获取到写入计数,如果写入计数不等于0 ,说明当前是被写入锁持有,并且尝试读取的线程和写入锁持有的线程不是同一个线程直接返回 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取读取锁的计数,即高16位的计数值 int r = sharedCount(c); // 如果非阻塞读取,并且读取计数小于可以读取的线程最大数,更新读取计数,将其加1 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 如果读取计数等于0,表示没有读取线程持有锁 firstReader = current; //将第一个读取线程设置成当前线程 firstReaderHoldCount = 1; //将读取计数加1 } else if (firstReader == current) { //如果当前线程就是第一个获取到读取锁的线程,将读取计数加1 firstReaderHoldCount++; } else { // 获取线程计数器(包含线程id和计数) HoldCounter rh = cachedHoldCounter; // 如果计数器为null,或者计数器中的线程id和当前线程id不一致 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //获取到当前线程的计数器 else if (rh.count == 0) //如果计数器中的count值为0 readHolds.set(rh); //将当前线程计数器添加进去 rh.count++; //将线程计数器加一 } return 1; } // 如果读取线程需要阻塞,或者读取线程超过最大值,或者更新失败,就执行下面这个方法 return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 如果是需要阻塞访问的话 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) //如果当前线程的计数等于0 ,将当前线程从readHolds中移除 readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
上面这个逻辑基本上和尝试去获取读取锁的逻辑基本相同
接下来看下读取锁的释放
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { //判断当前线程是否为第一个获取到读取锁线程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) 如果读取锁计数等于1 将当前读取锁置为null firstReader = null; else firstReaderHoldCount--; //否则将读取锁计数减一 } else { HoldCounter rh = cachedHoldCounter; //获取缓存计数 if (rh == null || rh.tid != getThreadId(current)) //如果当前缓存计数为null或者缓存计数器线程和当前线程不一致 rh = readHolds.get(); //获取到当前线程的缓存计数 int count = rh.count; //当前线程计数值 if (count <= 1) { //小于等于1 直接从缓存计数中移除 readHolds.remove(); if (count <= 0) //如果小于0 直接抛出异常 throw unmatchedUnlockException(); } --rh.count; //最终将当前线程的缓存计数减一 } for (;;) { int c = getState(); // 获取到状态结果 int nextc = c - SHARED_UNIT; //将读取计数减一 if (compareAndSetState(c, nextc)) //更新读取计数的值 // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; //判断释放读取锁之后,是否还持有锁 } }
总结:
关于ReentrantReadWriteLock源码分析部分,首先需要弄清楚的就是读写状态位的分离,高16位和低16位分别是读锁持有计数和写锁持有计数,其次关于读取锁的状态计数,因为是共享锁,因此就需要为每个读取线程单独记录一份计数,这里就是需要使用到ThreadLocal ,来保证每个线程拥有一个计数
关于线程读写锁中锁的降级,当持有为写锁时,可以降级到读锁,这个比较好理解,因为写锁只有一个线程持有,进行降低后,读取锁可以被多个线程持有,而读取锁想要升级成为写入锁,是不可以的,因为读取锁是共享锁,写入锁时独享锁,已经被读取锁持有的多个线程,是没有办法升级到写入锁的。
以上内容如有错误,欢迎指正~
参考:
java doc 文档