01_*AQS原理

gong_yz2023年3月1日大约 24 分钟并发编程

一、黑马版本

1.1 概述

全称是AbstractQueuedSynchronizer,抽象的队列同步器。是阻塞式锁和相关的同步器工具的框架。AQS中有一个核心属性state,其次还有一个双向链表以及一个单项链表。AQS是JUC下大量工具的基础类,很多工具都基于AQS实现的,比如lock锁,ReentrantLock、CountDownLatch、Semaphore、线程池等等都用到了AQS。可以这么说,只要搞懂了AQS,那么J.U.C中绝大部分的api都能轻松掌握。

特点

从使用层面来说,AQS的功能分为两种:独占和共享

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

获取锁的姿势:

//如果获取锁失败
if (!tryAcquire(arg)){
	//入队,可以选择阻塞当前线程  park unpark
}

释放锁的姿势:

//如果释放锁成功
if (tryRelease(arg)){
	//让阻塞线程恢复运行
}

1.2 AQS的基本属性和方法

//头节点
private transient volatile Node head;
//尾节点
private transient volatile Node tail;
//状态值
private volatile int state;

AQS的实现依赖内部的同步队列,也就是FIFO的双向队列,如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点,所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个Node其实都是由线程封装的,当线程争抢锁失败后会封装成Node加入到ASQ队列中去。

1.3 释放锁以及添加线程对于队列的变化

1.3.1 添加节点

当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化,首先看一下添加节点的场景。

这里会涉及到两个变化:

  1. 新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己
  2. 通过CAS讲tail重新指向新的尾部节点

1.3.2 释放锁移除节点

head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下:

这个过程也是涉及到两个变化:

  1. 修改head节点指向下一个获得锁的节点
  2. 新的获得锁的节点,将prev的指针指向null

这里有一个小的变化,就是设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要CAS保证,只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可。

1.4 实现不可重入锁

package com.gyz.juc;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Description 自定义不可重入锁
 * @Author GongYuZhuo
 * @Date 2021/7/4 15:20
 * @Version 1.0.0
 */
@Slf4j(topic = "c.UnRepeatLockTest")
public class UnRepeatLockTest {

    public static void main(String[] args) {
        MyLock myLock = new MyLock();
        new Thread(() -> {
            myLock.lock();
            log.debug("locking...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                myLock.unlock();
                log.debug("unlocking...");
            }
        }, "t1").start();

        new Thread(() -> {
            myLock.lock();
            try {
                log.debug("locinkg...");
            } finally {
                myLock.unlock();
                log.debug("unlocking...");
            }
        }, "t2").start();
    }

}

/**
 *  自定义锁
 */
class MyLock implements Lock {

    /**
     *  自定义同步器
     */
    class MySync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                //加上了锁,并设置 owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (compareAndSetState(1, 0)) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            return false;
        }

        //是否持有独占锁
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newConditon() {
            return new ConditionObject();
        }
    }

    private MySync mySync = new MySync();

    //尝试,不成功进入阻塞队列
    @Override
    public void lock() {
        mySync.acquire(1);
    }

    //尝试,不成功进入等待队列,可打断
    @Override
    public void lockInterruptibly() throws InterruptedException {
        mySync.acquireInterruptibly(1);
    }

    //尝试,不成功,则进入等待队列,可打断
    @Override
    public boolean tryLock() {
        return mySync.tryAcquire(1);
    }

    //尝试一次,不成功,进入等待队列,有时限
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return mySync.tryAcquireNanos(1, unit.toNanos(time));
    }

    //释放锁
    @Override
    public void unlock() {
        mySync.release(1);
    }

    //生成条件变量
    @Override
    public Condition newCondition() {
        return mySync.newConditon();
    }
}

不可重入测试。

/**
  * 测试一
  */
MyLock myLock = new MyLock();
new Thread(() -> {
    myLock.lock();
    System.out.println("locking");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("unloking");
        myLock.unlock();
    }
}, "t1").start();

new Thread(() -> {
    myLock.lock();
    System.out.println("locking");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        myLock.unlock();
        System.out.println("unlocking");

    }
}, "t2").start();

输出:

locking
unloking
locking
unlocking

如果改为下面测试代码,会发现自己也会被挡住(只会打印一次 locking) :

MyLock myLock = new MyLock();
myLock.lock();
log.debug("locking...");
myLock.lock();
log.debug("locking...");

1.5 AQS 要实现的功能目标

1.5.1 目标

1.5.2 设计

AQS 的基本思想其实很简单 。获取锁的逻辑:

while (state 状态不允许获取){
	if(队列中还没有此线程){
		入队并阻塞
	}
}
当前线程出队

释放锁的逻辑:

if (state 状态允许了){
	恢复阻塞的线程(s)
}

要点:

1)state 设计

2)阻塞恢复设计

  1. 队列设计

1.6 主要用到 AQS 的并发工具类


二、尚硅谷版本

2.1 前置知识

AbstractQueuedSynchronizer(AQS):抽象的队列同步器

  1. 公平锁和非公平锁
  2. 可重入锁
  3. LockSupport
  4. 自旋锁
  5. 数据结构之链表
  6. 设计模式之模板设计模式

一般我们说的 AQS 指的是 java.util.concurrent.locks 包下的 AbstractQueuedSynchronizer,但其实还有另外三种抽象队列同步器:AbstractOwnableSynchronizerAbstractQueuedLongSynchronizer AbstractQueuedSynchronizer

AQS 是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石, 通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量(state)表示持有锁的状态;

CLHCraig、Landin and Hagersten 队列,是一个双向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO:

AQS 能干嘛

AQS 初步认识

官方解释

有阻塞就需要排队,实现排队必然需要队列

AQS 内部体系框架

  1. AQS的int变量

    AQS的同步状态State成员变量,类似于银行办理业务的受理窗口状态:零就是没人,自由状态可以办理;大于等于1,有人占用窗口,等着去

    /**
     * The synchronization state.
     */
    private volatile int state;
    
    
  2. AQS的CLH队列

    CLH队列(三个人的名字组成),为一个双向队列,类似于银行侯客区的等待顾客

  3. 内部类Node(Node类在AQS类内部)

    Node的等待状态waitState成员变量,类似于等候区其它顾客(其它线程)的等待状态,队列中每个排队的个体就是一个Node

    /**
    * ...
    */
    volatile int waitStatus;
    

    Node类的内部结构

    static final class Node{
        //共享
        static final Node SHARED = new Node();
        
        //独占
        static final Node EXCLUSIVE = null;
        
        //线程被取消了
        static final int CANCELLED = 1;
        
        //后继线程需要唤醒
        static final int SIGNAL = -1;
        
        //等待condition唤醒
        static final int CONDITION = -2;
        
        //共享式同步状态获取将会无条件地传播下去
        static final int PROPAGATE = -3;
        
        // 初始为e,状态是上面的几种
        volatile int waitStatus;
        
        // 前置节点
        volatile Node prev;
        
        // 后继节点
        volatile Node next;
    
        // ...
        
    
  4. AQS同步队列的基本结构

2.2 和AQS有关的并发编程类

image-20201227165833625
image-20201227165833625

2.3 从ReentrantLock开始解读AQS

2.3.1 前置知识

2.3.2 lock()方法开始

通过 ReentrantLock 的源码来讲解公平锁和非公平锁

ReentrantLock 内定义了静态内部类,分别为 NoFairSync(非公平锁)和 FairSync(公平锁)

ReentrantLock 的构造函数:不传参数表示创建非公平锁;参数为 true 表示创建公平锁;参数为 false 表示创建非公平锁

lock() 方法的执行流程:以 NonfairSync 为例

ReentrantLock 中,NoFairSyncFairSynctryAcquire() 方法的区别,可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors() 方法是公平锁加锁时判断等待队列中是否存在有效节点的方法:

公平锁与非公平锁的总结

对比公平锁和非公平锁的tryAcqure()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

  1. 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;

  2. 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一 个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)

acquire() 方法最终都会调用 tryAcquire() 方法:

NonfairSyncFairSync 中均重写了其父类 AbstractQueuedSynchronizer 中的 tryAcquire() 方法

以案例代码解析

源码解读比较困难,我们这里举个例子,假设 A、B、C 三个人都要去银行窗口办理业务,但是银行窗口只有一个,我们使用 lock.lock() 模拟这种情况:

public class AQSDemo {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        //带入一个银行办理业务的案例来模拟我们的AQS如何进行线程的管理和通知唤醒机制
        //3个线程模拟3个来银行网点,受理窗口办理业务的顾客
        //A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理
        new Thread(() -> {
                lock.lock();
                try{
                    System.out.println("-----A thread come in");

                    try { TimeUnit.MINUTES.sleep(20); }catch (Exception e) {e.printStackTrace();}
                }finally {
                    lock.unlock();
                }
        },"A").start();

        //第二个顾客,第二个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,
        //进入候客区
        new Thread(() -> {
            lock.lock();
            try{
                System.out.println("-----B thread come in");
            }finally {
                lock.unlock();
            }
        },"B").start();

        //第三个顾客,第三个线程---》由于受理业务的窗口只有一个(只能一个线程持有锁),此时C只能等待,
        //进入候客区
        new Thread(() -> {
            lock.lock();
            try{
                System.out.println("-----C thread come in");
            }finally {
                lock.unlock();
            }
        },"C").start();
    }
}

先来看看线程 A(客户 A)的执行流程

再来看看线程 B(客户 B)的执行流程

最后来看看线程 C(客户 C)的执行流程

线程 C 和线程 B 的执行流程很类似,都是执行 acquire() 中的方法;

但是在 addWaiter() 方法中,执行流程有些区别。此时 tail != null,因此在 addWaiter() 方法中就已经将 nodeC 添加至队尾了

执行完 addWaiter() 方法后,就已经将 nodeC 挂在了双端同步队列的队尾,不需要再执行 enq(node) 方法

再看acquireQueued() 方法的执行逻辑

先来看看 acquireQueued() 方法的源代码,其实这样直接看代码有点懵逼,我们接下来举例来理解。注意看:两个 if 判断中的代码都放在 for( ; ; ) 中执行,这样可以实现自旋的操作

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

继续看线程 B 的执行流程

线程 B 执行 addWaiter() 方法之后,就进入了 acquireQueued() 方法中,此时传入的参数为封装了线程 B 的 nodeB 节点,nodeB 的前驱结点为哨兵节点,因此 final Node p = node.predecessor() 执行完后,p 将指向哨兵节点。哨兵节点满足 p == head,但是线程 B 执行 tryAcquire(arg) 方法尝试抢占 lock 锁时还是会失败,因此会执行下面 if 判断中的 shouldParkAfterFailedAcquire(p, node) 方法,该方法的代码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

哨兵节点的 waitStatus == 0,因此执行 CAS 操作将哨兵节点的 waitStatus 改为 Node.SIGNAL(-1)

 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

注意:compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 调用 unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); 实现,虽然 compareAndSwapInt() 方法内无自旋,但是在 acquireQueued() 方法中的 for( ; ; ) 能保证此自选操作成功(另一种情况就是线程 B 抢占到 lock 锁)

执行完上述操作,将哨兵节点的 waitStatus 设置为了 -1;

执行完毕将退出 if 判断,又会重新进入 for( ; ; ) 循环,此时执行 shouldParkAfterFailedAcquire(p, node) 方法时会返回 true,因此此时会接着执行 parkAndCheckInterrupt() 方法

线程 B 调用 park() 方法后被挂起,程序不会然续向下执行,程序就在这儿排队等待

线程 C 的执行流程

线程 C 最终也会执行到 LockSupport.park(this); 处,然后被挂起,进入等待区。

总结

2.3.3 unlock() 开始

线程 A 执行 unlock() 方法

继续来看 B 线程被唤醒之后的执行逻辑

再次回到 lock() 方法的执行流程中来,线程 B 被 unpark() 之后将不再阻塞,继续执行下面的程序,线程 B 正常被唤醒,因此 Thread.interrupted() 的值为 false,表示线程 B 未被中断。

回到上一层方法中,此时 lock 锁未被占用,线程 B 执行 tryAcquire(arg) 方法能够抢到 lock 锁,并且将 state 变量的值设置为 1,表示该 lock 锁已经被占用

接着来研究下 setHead(node) 方法:传入的节点为 nodeB,头指针指向 nodeB 节点;将 nodeB 中封装的线程置为 null(因为已经获得锁了);nodeB 不再指向其前驱节点(哨兵节点)。这一切都是为了将 nodeB 作为新的哨兵节点

执行完 setHead(node) 方法的状态如下图所示:

p.next 设置为 null,这是原来的哨兵节点就是完全孤立的一个节点,此时 nodeB 作为新的哨兵节点

线程 C 也是类似的执行流程!!!