SIGN IN SIGN UP
Snailclimb / JavaGuide UNCLAIMED

Java 面试 & 后端通用面试指南,覆盖计算机基础、数据库、分布式、高并发与系统设计。准备后端技术面试,首选 JavaGuide!

0 0 0 Java
---
title: AQS 详解
2026-01-16 21:04:11 +08:00
description: AQS抽象队列同步器深度解析详解AQS核心原理、CLH队列结构、独占锁与共享锁实现、ReentrantLock/Semaphore等同步器应用、线程阻塞唤醒机制。
category: Java
tag:
- Java并发
head:
- - meta
- name: keywords
content: AQS,AbstractQueuedSynchronizer,队列同步器,独占锁,共享锁,CLH队列,ReentrantLock实现原理
---
2024-09-24 00:09:56 +08:00
<!-- markdownlint-disable MD024 -->
## AQS 介绍
AQS 的全称为 `AbstractQueuedSynchronizer` ,翻译过来的意思就是抽象队列同步器。这个类在 `java.util.concurrent.locks` 包下面。
2023-03-10 18:40:00 +08:00
![](https://oss.javaguide.cn/github/javaguide/AQS.png)
AQS 就是一个抽象类,主要用来构建锁和同步器。
```java
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}
```
2024-12-19 23:01:04 +08:00
AQS 为构建锁和同步器提供了一些通用功能的实现。因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 `ReentrantLock``Semaphore`,其他的诸如 `ReentrantReadWriteLock``SynchronousQueue`等等皆是基于 AQS 的。
## AQS 原理
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
2024-12-19 23:01:04 +08:00
### AQS 快速了解
2024-12-19 23:06:39 +08:00
在真正讲解 AQS 源码之前,需要对 AQS 有一个整体层面的认识。这里会先通过几个问题,从整体层面上认识 AQS了解 AQS 在整个 Java 并发中所位于的层面,之后在学习 AQS 源码的过程中,才能更加了解同步器和 AQS 之间的关系。
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
#### AQS 的作用是什么?
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
AQS 解决了开发者在实现同步器时的复杂性问题。它提供了一个通用框架,用于实现各种同步器,例如 **可重入锁**`ReentrantLock`)、**信号量**`Semaphore`)和 **倒计时器**`CountDownLatch`。通过封装底层的线程同步机制AQS 将复杂的线程管理逻辑隐藏起来,使开发者只需专注于具体的同步逻辑。
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
简单来说AQS 是一个抽象类,为同步器提供了通用的 **执行框架**。它定义了 **资源获取和释放的通用流程**,而具体的资源获取逻辑则由具体同步器通过重写模板方法来实现。 因此,可以将 AQS 看作是同步器的 **基础“底座”**,而同步器则是基于 AQS 实现的 **具体“应用”**
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
#### AQS 为什么使用 CLH 锁队列的变体?
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
CLH 锁是一种基于 **自旋锁** 的优化实现。
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
先说一下自旋锁存在的问题:自旋锁通过线程不断对一个原子变量执行 `compareAndSet`(简称 `CAS`)操作来尝试获取锁。在高并发场景下,多个线程会同时竞争同一个原子变量,容易造成某个线程的 `CAS` 操作长时间失败,从而导致 **“饥饿”问题**(某些线程可能永远无法获取锁)。
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
CLH 锁通过引入一个队列来组织并发竞争的线程,对自旋锁进行了改进:
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
- 每个线程会作为一个节点加入到队列中,并通过自旋监控前一个线程节点的状态,而不是直接竞争共享变量。
- 线程按顺序排队,确保公平性,从而避免了 “饥饿” 问题。
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
AQSAbstractQueuedSynchronizer在 CLH 锁的基础上进一步优化,形成了其内部的 **CLH 队列变体**。主要改进点有以下两方面:
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
1. **自旋 + 阻塞** CLH 锁使用纯自旋方式等待锁的释放,但大量的自旋操作会占用过多的 CPU 资源。AQS 引入了 **自旋 + 阻塞** 的混合机制:
- 如果线程获取锁失败,会先短暂自旋尝试获取锁;
- 如果仍然失败,则线程会进入阻塞状态,等待被唤醒,从而减少 CPU 的浪费。
2. **单向队列改为双向队列**CLH 锁使用单向队列节点只知道前驱节点的状态而当某个节点释放锁时需要通过队列唤醒后续节点。AQS 将队列改为 **双向队列**,新增了 `next` 指针,使得节点不仅知道前驱节点,也可以直接唤醒后继节点,从而简化了队列操作,提高了唤醒效率。
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
#### AQS 的性能比较好,原因是什么?
2024-12-19 23:01:04 +08:00
因为 AQS 内部大量使用了 `CAS` 操作。
2024-12-19 23:01:04 +08:00
AQS 内部通过队列来存储等待的线程节点。由于队列是共享资源,在多线程场景下,需要保证队列的同步访问。
2024-12-19 23:01:04 +08:00
AQS 内部通过 `CAS` 操作来控制队列的同步访问,`CAS` 操作主要用于控制 `队列初始化``线程节点入队` 两个操作的并发安全。虽然利用 `CAS` 控制并发安全可以保证比较好的性能,但同时会带来比较高的 **编码复杂度**
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
#### AQS 中为什么 Node 节点需要不同的状态?
2024-12-19 23:01:04 +08:00
AQS 中的 `waitStatus` 状态类似于 **状态机** ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
- 状态 `0` :新节点加入队列之后,初始状态为 `0`
2024-12-19 23:01:04 +08:00
- 状态 `SIGNAL` :当有新的节点加入队列,此时新节点的前继节点状态就会由 `0` 更新为 `SIGNAL` ,表示前继节点释放锁之后,需要对新节点进行唤醒操作。如果唤醒 `SIGNAL` 状态节点的后续节点,就会将 `SIGNAL` 状态更新为 `0` 。即通过清除 `SIGNAL` 状态,表示已经执行了唤醒操作。
2024-12-19 23:01:04 +08:00
- 状态 `CANCELLED` :如果一个节点在队列中等待获取锁锁时,因为某种原因失败了,该节点的状态就会变为 `CANCELLED` ,表明取消获取锁,这种状态的节点是异常的,无法被唤醒,也无法唤醒后继节点。
2024-12-19 23:01:04 +08:00
### AQS 核心思想
2024-12-19 23:01:04 +08:00
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 **CLH 锁** Craig, Landin, and Hagersten locks 进一步优化实现的。
2024-12-19 23:01:04 +08:00
**CLH 锁** 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。**CLH 锁** 的队列结构如下图所示。
2024-12-20 15:45:13 +08:00
![CLH 锁的队列结构](https://oss.javaguide.cn/github/javaguide/open-source-project/clh-lock-queue-structure.png)
2024-12-19 23:01:04 +08:00
AQS 中使用的 **等待队列** 是 CLH 锁队列的变体(接下来简称为 CLH 变体队列)。
AQS 的 CLH 变体队列是一个双向队列会暂时获取不到锁的线程将被加入到该队列中CLH 变体队列和原本的 CLH 锁队列的区别主要有两点:
-**自旋** 优化为 **自旋 + 阻塞** :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 变体队列中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。
-**单向队列** 优化为 **双向队列** :在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 `next` 指针,成为了双向队列。
AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点Node来实现锁的分配。在 CLH 变体队列中一个节点表示一个线程它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next
AQS 中的 CLH 变体队列结构如下图所示:
2024-12-20 15:45:13 +08:00
![CLH 变体队列结构](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-structure-bianti.png)
2023-04-28 17:31:44 +08:00
关于 AQS 核心数据结构-CLH 锁的详细解读,强烈推荐阅读 [Java AQS 核心数据结构-CLH 锁 - Qunar 技术沙龙](https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg) 这篇文章。
2023-07-05 16:21:23 +08:00
AQS(`AbstractQueuedSynchronizer`)的核心原理图:
2024-12-20 15:45:13 +08:00
![CLH 变体队列](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-state.png)
2023-07-13 09:08:59 +08:00
AQS 使用 **int 成员变量 `state` 表示同步状态**,通过内置的 **FIFO 线程等待/等待队列** 来完成获取资源线程的排队工作。
2024-12-19 23:01:04 +08:00
`state` 变量由 `volatile` 修饰,用于展示当前临界资源的获取情况。
```java
// 共享变量使用volatile修饰保证线程可见性
private volatile int state;
```
另外,状态信息 `state` 可以通过 `protected` 类型的`getState()``setState()``compareAndSetState()` 进行操作。并且,这几个方法都是 `final` 修饰的,在子类中无法被重写。
```java
//返回同步状态的当前值
2019-12-16 17:27:56 +08:00
protected final int getState() {
return state;
}
// 设置同步状态的值
2019-12-16 17:27:56 +08:00
protected final void setState(int newState) {
state = newState;
}
//原子地CAS操作将同步状态值设置为给定值update如果当前同步状态的值等于expect期望值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
```
2024-12-19 23:01:04 +08:00
以可重入的互斥锁 `ReentrantLock` 为例,它的内部维护了一个 `state` 变量,用来表示锁的占用状态。`state` 的初始值为 0表示锁处于未锁定状态。当线程 A 调用 `lock()` 方法时,会尝试通过 `tryAcquire()` 方法独占该锁,并让 `state` 的值加 1。如果成功了那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列CLH 变体队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了释放锁之前A 线程自己是可以重复获取此锁的(`state` 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 `state` 的值回到 0也就是让锁恢复到未锁定状态。只有这样其他等待的线程才能有机会获取该锁。
2023-07-05 16:21:23 +08:00
线程 A 尝试获取锁的过程如下图所示(图源[从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队](./reentrantlock.md)
![AQS 独占模式获取锁](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-exclusive-mode-acquire-lock.png)
再以倒计时器 `CountDownLatch` 以例,任务分为 N 个子线程去执行,`state` 也初始化为 N注意 N 要与线程个数一致)。这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 `countDown()` 方法。该方法会尝试使用 CAS(Compare and Swap) 操作,让 `state` 的值减少 1。当所有的子线程都执行完毕后`state` 的值变为 0`CountDownLatch` 会调用 `unpark()` 方法,唤醒主线程。这时,主线程就可以从 `await()` 方法(`CountDownLatch` 中的`await()` 方法而非 AQS 中的)返回,继续执行后续的操作。
2019-11-27 08:27:31 +08:00
2024-12-19 23:01:04 +08:00
### Node 节点 waitStatus 状态含义
2019-11-27 08:27:31 +08:00
AQS 中的 `waitStatus` 状态类似于 **状态机** ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
2024-12-19 23:01:04 +08:00
| Node 节点状态 | 值 | 含义 |
| ------------- | --- | ------------------------------------------------------------------------------------------------------------------------- |
| `CANCELLED` | 1 | 表示线程已经取消获取锁。线程在等待获取资源时被中断、等待资源超时会更新为该状态。 |
| `SIGNAL` | -1 | 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。 |
| `CONDITION` | -2 | 表示节点在等待 Condition。当其他线程调用了 Condition 的 `signal()` 方法后,节点会从等待队列转移到同步队列中等待获取资源。 |
| `PROPAGATE` | -3 | 用于共享模式。在共享模式下,可能会出现线程在队列中无法被唤醒的情况,因此引入了 `PROPAGATE` 状态来解决这个问题。 |
2024-12-19 23:01:04 +08:00
| | 0 | 加入队列的新节点的初始状态。 |
2019-11-27 08:27:31 +08:00
在 AQS 的源码中,经常使用 `> 0``< 0` 来对 `waitStatus` 进行判断。
2024-12-19 23:01:04 +08:00
如果 `waitStatus > 0` ,表明节点的状态已经取消等待获取资源。
2019-11-27 08:27:31 +08:00
如果 `waitStatus < 0` ,表明节点的状态处于正常的状态,即没有取消等待。
2019-11-27 08:27:31 +08:00
其中 `SIGNAL` 状态是最重要的,节点状态流转以及对应操作如下:
| 状态流转 | 对应操作 |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `0` | 新节点入队时,初始状态为 `0` 。 |
| `0 -> SIGNAL` | 新节点入队时,它的前继节点状态会由 `0` 更新为 `SIGNAL``SIGNAL` 状态表明该节点的后续节点需要被唤醒。 |
| `SIGNAL -> 0` | 在唤醒后继节点时,需要清除当前节点的状态。通常发生在 `head` 节点,比如 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,表示已经对 `head` 节点的后继节点唤醒了。 |
| `0 -> PROPAGATE` | AQS 内部引入了 `PROPAGATE` 状态,为了解决并发场景下,可能造成的线程节点无法唤醒的情况。(在 AQS 共享模式获取资源的源码分析会讲到) |
2019-11-27 08:27:31 +08:00
2024-12-19 23:01:04 +08:00
### 自定义同步器
基于 AQS 可以实现自定义的同步器, AQS 提供了 5 个模板方法(模板方法模式)。如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
2019-11-27 08:27:31 +08:00
2024-12-19 23:01:04 +08:00
1. 自定义的同步器继承 `AbstractQueuedSynchronizer`
2. 重写 AQS 暴露的模板方法。
2019-11-27 08:27:31 +08:00
**AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:**
2019-11-27 08:27:31 +08:00
```java
//独占方式。尝试获取资源成功则返回true失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源成功则返回true失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源成功则返回true失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()
2019-11-27 08:27:31 +08:00
```
**什么是钩子方法呢?** 钩子方法是一种被声明在抽象类中的方法,一般使用 `protected` 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。
2019-11-27 08:27:31 +08:00
篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:[用 Java8 改造后的模板方法模式真的是 yyds!](https://mp.weixin.qq.com/s/zpScSCktFpnSWHWIQem2jg)。
2019-11-27 08:27:31 +08:00
除了上面提到的钩子方法之外AQS 类中的其他方法都是 `final` ,所以无法被其他类重写。
2019-11-27 08:27:31 +08:00
2024-12-19 23:01:04 +08:00
### AQS 资源共享方式
AQS 定义两种资源共享方式:`Exclusive`(独占,只有一个线程能执行,如`ReentrantLock`)和`Share`(共享,多个线程可同时执行,如`Semaphore`/`CountDownLatch`)。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现`tryAcquire-tryRelease``tryAcquireShared-tryReleaseShared`中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如`ReentrantReadWriteLock`
### 独占模式与共享模式的深入对比
上面简要介绍了 AQS 的两种资源共享方式,下面从多个维度对独占模式和共享模式进行系统对比,帮助更深入地理解二者的差异。
#### 特性对比
| 对比维度 | 独占模式Exclusive | 共享模式Share |
| --- | --- | --- |
| **并发度** | 同一时刻只有一个线程能获取到资源 | 同一时刻可以有多个线程同时获取到资源 |
| **获取资源入口** | `acquire(int arg)` | `acquireShared(int arg)` |
| **释放资源入口** | `release(int arg)` | `releaseShared(int arg)` |
| **需要重写的模板方法** | `tryAcquire(int)` / `tryRelease(int)` | `tryAcquireShared(int)` / `tryReleaseShared(int)` |
| **tryXxx 返回值** | `boolean``true` 表示获取/释放成功 | `int`获取时负数表示失败0 表示成功但无剩余资源,正数表示成功且有剩余资源;`boolean`(释放时) |
| **唤醒后继节点** | 释放资源时唤醒一个后继节点 | 获取资源成功后,如果还有剩余资源,会继续唤醒后续节点(传播唤醒) |
| **Node 类型标识** | `Node.EXCLUSIVE``null` | `Node.SHARED`(一个静态的 `Node` 实例) |
| **典型实现** | `ReentrantLock``ReentrantReadWriteLock` 的写锁 | `Semaphore``CountDownLatch``ReentrantReadWriteLock` 的读锁 |
#### `state` 在不同同步器中的语义
AQS 中的 `state` 是一个通用的同步状态变量,不同的同步器赋予它不同的含义:
| 同步器 | 模式 | `state` 的语义 |
| --- | --- | --- |
| `ReentrantLock` | 独占 | 表示锁的重入次数。`state == 0` 表示锁空闲;`state > 0` 表示锁被持有,值为重入次数 |
| `ReentrantReadWriteLock` | 独占 + 共享 | 高 16 位表示读锁的持有数量(共享),低 16 位表示写锁的重入次数(独占) |
| `Semaphore` | 共享 | 表示可用许可证的数量。每次 `acquire()` 减少,`release()` 增加 |
| `CountDownLatch` | 共享 | 表示需要等待的计数。每次 `countDown()` 减 1到 0 时唤醒所有等待线程 |
下面通过一个代码示例来直观感受独占模式和共享模式在使用上的区别:
```java
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
public class ExclusiveVsSharedDemo {
public static void main(String[] args) {
// 独占模式:同一时刻只有 1 个线程能进入临界区
ReentrantLock lock = new ReentrantLock();
// 共享模式:同一时刻最多 3 个线程能进入临界区
Semaphore semaphore = new Semaphore(3);
// 独占模式示例
Runnable exclusiveTask = () -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName()
+ " 获取到独占锁,正在执行...");
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
};
// 共享模式示例
Runnable sharedTask = () -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()
+ " 获取到许可证,正在执行...");
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
};
System.out.println("=== 独占模式ReentrantLock===");
for (int i = 0; i < 5; i++) {
new Thread(exclusiveTask, "独占线程-" + i).start();
}
try { Thread.sleep(3000); } catch (InterruptedException e) { }
System.out.println("\n=== 共享模式Semaphore===");
for (int i = 0; i < 5; i++) {
new Thread(sharedTask, "共享线程-" + i).start();
}
}
}
```
运行上面的代码可以观察到:独占模式下 5 个线程严格按顺序一个一个执行,而共享模式下最多有 3 个线程同时执行。
2024-12-19 23:01:04 +08:00
### AQS 资源获取源码分析(独占模式)
AQS 中以独占模式获取资源的入口方法是 `acquire()` ,如下:
```JAVA
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
`acquire()` 中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:
- `tryAcquire()` :尝试获取锁(模板方法),`AQS` 不提供具体实现,由子类实现。
- `addWaiter()` :如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。
- `acquireQueued()` :对线程进行阻塞,并调用 `tryAcquire()` 方法让队列中的线程尝试获取锁。
2024-12-19 23:01:04 +08:00
#### `tryAcquire()` 分析
AQS 中对应的 `tryAcquire()` 模板方法如下:
```JAVA
// AQS
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
```
`tryAcquire()` 方法是 AQS 提供的模板方法,不提供默认实现。
因此,这里分析 `tryAcquire()` 方法时,以 `ReentrantLock` 的非公平锁(独占锁)为例进行分析,`ReentrantLock` 内部实现的 `tryAcquire()` 会调用到下边的 `nonfairTryAcquire()`
```JAVA
// ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 1、获取 AQS 中的 state 状态
int c = getState();
// 2、如果 state 为 0证明锁没有被其他线程占用
if (c == 0) {
// 2.1、通过 CAS 对 state 进行更新
if (compareAndSetState(0, acquires)) {
// 2.2、如果 CAS 更新成功,就将锁的持有者设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 3、如果当前线程和锁的持有线程相同说明发生了「锁的重入」
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 3.1、将锁的重入次数加 1
setState(nextc);
return true;
}
// 4、如果锁被其他线程占用就返回 false表示获取锁失败
return false;
}
```
`nonfairTryAcquire()` 方法内部,主要通过两个核心操作去完成资源的获取:
- 通过 `CAS` 更新 `state` 变量。`state == 0` 表示资源没有被占用。`state > 0` 表示资源被占用,此时 `state` 表示重入次数。
- 通过 `setExclusiveOwnerThread()` 设置持有资源的线程。
如果线程更新 `state` 变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。
#### `addWaiter()` 分析
在通过 `tryAcquire()` 方法尝试获取资源失败之后,会调用 `addWaiter()` 方法将当前线程封装为 Node 节点加入 `AQS` 内部的队列中。`addWaite()` 代码如下:
```JAVA
// AQS
private Node addWaiter(Node mode) {
// 1、将当前线程封装为 Node 节点。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 2、如果 pred = null则证明 tail 节点已经被初始化,直接将 Node 节点加入队列即可。
if (pred != null) {
node.prev = pred;
// 2.1、通过 CAS 控制并发安全。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 3、初始化队列并将新创建的 Node 节点加入队列。
enq(node);
return node;
}
```
**节点入队的并发安全:**
`addWaiter()` 方法中,需要执行 Node 节点 **入队** 的操作。由于是在多线程环境下,因此需要通过 `CAS` 操作保证并发安全。
通过 `CAS` 操作去更新 `tail` 指针指向新入队的 Node 节点,`CAS` 可以保证只有一个线程会成功修改 `tail` 指针,以此来保证 Node 节点入队时的并发安全。
**AQS 内部队列的初始化:**
在执行 `addWaiter()` 时,如果发现 `pred == null` ,即 `tail` 指针为 null则证明队列没有初始化需要调用 `enq()` 方法初始化队列,并将 `Node` 节点加入到初始化后的队列中,代码如下:
```JAVA
// AQS
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 1、通过 CAS 操作保证队列初始化的并发安全
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2、与 addWaiter() 方法中节点入队的操作相同
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
`enq()` 方法中初始化队列,在初始化过程中,也需要通过 `CAS` 来保证并发安全。
初始化队列总共包含两个步骤:初始化 `head` 节点、`tail` 指向 `head` 节点。
**初始化后的队列如下图所示:**
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/clh-queue-structure-init.png)
2024-12-19 23:01:04 +08:00
#### `acquireQueued()` 分析
为了方便阅读,这里再贴一下 `AQS``acquire()` 获取资源的代码:
```JAVA
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
`acquire()` 方法中,通过 `addWaiter()` 方法将 `Node` 节点加入队列之后,就会调用 `acquireQueued()` 方法。代码如下:
```JAVA
// AQS令队列中的节点尝试获取锁并且对线程进行阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1、尝试获取锁。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2、判断线程是否可以阻塞如果可以则阻塞当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3、如果获取锁失败就会取消获取锁将节点状态更新为 CANCELLED。
if (failed)
cancelAcquire(node);
}
}
```
`acquireQueued()` 方法中,主要做两件事情:
- **尝试获取资源:** 当前线程加入队列之后,如果发现前继节点是 `head` 节点,说明当前线程是队列中第一个等待的节点,于是调用 `tryAcquire()` 尝试获取资源。
- **阻塞当前线程** :如果尝试获取资源失败,就需要阻塞当前线程,等待被唤醒之后获取资源。
**1、尝试获取资源**
`acquireQueued()` 方法中,尝试获取资源总共有 2 个步骤:
- `p == head` :表明当前节点的前继节点为 `head` 节点。此时当前节点为 AQS 队列中的第一个等待节点。
- `tryAcquire(arg) == true` :表明当前线程尝试获取资源成功。
在成功获取资源之后,就需要将当前线程的节点 **从等待队列中移除** 。移除操作为:将当前等待的线程节点设置为 `head` 节点(`head` 节点是虚拟节点,并不参与排队获取资源)。
**2、阻塞当前线程**
`AQS` 中,当前节点的唤醒需要依赖于上一个节点。如果上一个节点取消获取锁,它的状态就会变为 `CANCELLED` `CANCELLED` 状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过 `CANCELLED` 状态的节点。
通过 `shouldParkAfterFailedAcquire()` 方法来判断当前线程节点是否可以阻塞,如下:
```JAVA
// AQS判断当前线程节点是否可以阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 1、前继节点状态正常直接返回 true 即可。
if (ws == Node.SIGNAL)
return true;
// 2、ws > 0 表示前继节点的状态异常,即为 CANCELLED 状态,需要跳过异常状态的节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 3、如果前继节点的状态不是 SIGNAL也不是 CANCELLED就将状态设置为 SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
```
`shouldParkAfterFailedAcquire()` 方法中的判断逻辑:
- 如果发现前继节点的状态是 `SIGNAL` ,则可以阻塞当前线程。
- 如果发现前继节点的状态是 `CANCELLED` ,则需要跳过 `CANCELLED` 状态的节点。
- 如果发现前继节点的状态不是 `SIGNAL``CANCELLED` ,表明前继节点的状态处于正常等待资源的状态,因此将前继节点的状态设置为 `SIGNAL` ,表明该前继节点需要对后续节点进行唤醒。
当判断当前线程可以阻塞之后,通过调用 `parkAndCheckInterrupt()` 方法来阻塞当前线程。内部使用了 `LockSupport` 来实现阻塞。`LockSupoprt` 底层是基于 `Unsafe` 类来阻塞线程,代码如下:
```JAVA
// AQS
private final boolean parkAndCheckInterrupt() {
// 1、线程阻塞到这里
LockSupport.park(this);
// 2、线程被唤醒之后返回线程中断状态
return Thread.interrupted();
}
```
**为什么在线程被唤醒之后,要返回线程的中断状态呢?**
`parkAndCheckInterrupt()` 方法中,当执行完 `LockSupport.park(this)` ,线程会被阻塞,代码如下:
```JAVA
// AQS
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 线程被唤醒之后,需要返回线程中断状态
return Thread.interrupted();
}
```
当线程被唤醒之后,需要执行 `Thread.interrupted()` 来返回线程的中断状态,这是为什么呢?
这个和线程的中断协作机制有关系,线程被唤醒之后,并不确定是被中断唤醒,还是被 `LockSupport.unpark()` 唤醒,因此需要通过线程的中断状态来判断。
**在 `acquire()` 方法中,为什么需要调用 `selfInterrupt()` **
`acquire()` 方法代码如下:
```JAVA
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
`acquire()` 方法中,当 `if` 语句的条件返回 `true` 后,就会调用 `selfInterrupt()` ,该方法会中断当前线程,为什么需要中断当前线程呢?
`if` 判断为 `true` 时,需要 `tryAcquire()` 返回 `false` ,并且 `acquireQueued()` 返回 `true`
其中 `acquireQueued()` 方法返回的是线程被唤醒之后的 **中断状态** ,通过执行 `Thread.interrupted()` 来返回。该方法在返回中断状态的同时,会清除线程的中断状态。
因此如果 `if` 判断为 `true` ,表明线程的中断状态为 `true` ,但是调用 `Thread.interrupted()` 之后,线程的中断状态被清除为 `false` ,因此需要重新执行 `selfInterrupt()` 来重新设置线程的中断状态。
### AQS 资源释放源码分析(独占模式)
AQS 中以独占模式释放资源的入口方法是 `release()` ,代码如下:
```JAVA
// AQS
public final boolean release(int arg) {
// 1、尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 2、唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
```
`release()` 方法中,主要做两件事:尝试释放锁和唤醒后继节点。对应方法如下:
**1、尝试释放锁**
通过 `tryRelease()` 方法尝试释放锁,该方法为模板方法,由自定义同步器实现,因此这里仍然以 `ReentrantLock` 为例来讲解。
`ReentrantLock` 中实现的 `tryRelease()` 方法如下:
```JAVA
// ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 1、判断持有锁的线程是否为当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 2、如果 state 为 0则表明当前线程已经没有重入次数。因此将 free 更新为 true表明该线程会释放锁。
if (c == 0) {
free = true;
// 3、更新持有资源的线程为 null
setExclusiveOwnerThread(null);
}
// 4、更新 state 值
setState(c);
return free;
}
```
`tryRelease()` 方法中,会先计算释放锁之后的 `state` 值,判断 `state` 值是否为 0。
- 如果 `state == 0` ,表明该线程没有重入次数了,更新 `free = true` ,并修改持有资源的线程为 null表明该线程完全释放这把锁。
- 如果 `state != 0` ,表明该线程还存在重入次数,因此不更新 `free` 值,`free` 值为 `false` 表明该线程没有完全释放这把锁。
之后更新 `state` 值,并返回 `free` 值,`free` 值表明线程是否完全释放锁。
**2、唤醒后继节点**
如果 `tryRelease()` 返回 `true` ,表明线程已经没有重入次数了,锁已经被完全释放,因此需要唤醒后继节点。
在唤醒后继节点之前,需要判断是否可以唤醒后继节点,判断条件为: `h != null && h.waitStatus != 0` 。这里解释一下为什么要这样判断:
- `h == null` :表明 `head` 节点还没有被初始化,也就是 AQS 中的队列没有被初始化,因此无法唤醒队列中的线程节点。
- `h != null && h.waitStatus == 0` :表明头节点刚刚初始化完毕(节点的初始化状态为 0后继节点线程还没有成功入队因此不需要对后续节点进行唤醒。当后继节点入队之后会将前继节点的状态修改为 `SIGNAL` ,表明需要对后继节点进行唤醒)
- `h != null && h.waitStatus != 0` :其中 `waitStatus` 有可能大于 0也有可能小于 0。其中 `> 0` 表明节点已经取消等待获取资源,`< 0` 表明节点处于正常等待状态。
接下来进入 `unparkSuccessor()` 方法查看如何唤醒后继节点:
```JAVA
// AQS这里的入参 node 为队列的头节点(虚拟头节点)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 1、将头节点的状态进行清除为后续的唤醒做准备。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 2、如果后继节点异常则需要从 tail 向前遍历,找到正常状态的节点进行唤醒。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 3、唤醒后继节点
LockSupport.unpark(s.thread);
}
```
`unparkSuccessor()` 中,如果头节点的状态 `< 0` (在正常情况下,只要有后继节点,头节点的状态应该为 `SIGNAL` ,即 -1表示需要对后继节点进行唤醒因此这里提前清除头节点的状态标识将状态修改为 0表示已经执行了对后续节点唤醒的操作。
如果 `s == null` 或者 `s.waitStatus > 0` ,表明后继节点异常,此时不能唤醒异常节点,而是要找到正常状态的节点进行唤醒。
因此需要从 `tail` 指针向前遍历,来找到第一个状态正常(`waitStatus <= 0`)的节点进行唤醒。
**为什么要从 `tail` 指针向前遍历,而不是从 `head` 指针向后遍历,寻找正常状态的节点呢?**
遍历的方向和 **节点的入队操作** 有关。入队方法如下:
```JAVA
// AQS节点入队方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 1、先修改 prev 指针。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 2、再修改 next 指针。
pred.next = node;
return node;
}
}
enq(node);
return node;
}
```
`addWaiter()` 方法中,`node` 节点入队需要修改 `node.prev``pred.next` 两个指针,但是这两个操作并不是 **原子操作** ,先修改了 `node.prev` 指针,之后才修改 `pred.next` 指针。
在极端情况下,可能会出现 `head` 节点的下一个节点状态为 `CANCELLED` ,此时新入队的节点仅更新了 `node.prev` 指针,还未更新 `pred.next` 指针,如下图:
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-addWaiter.png)
2024-12-19 23:01:04 +08:00
这样如果从 `head` 指针向后遍历,无法找到新入队的节点,因此需要从 `tail` 指针向前遍历找到新入队的节点。
### 图解 AQS 工作原理(独占模式)
2024-12-19 23:01:04 +08:00
至此AQS 中以独占模式获取资源、释放资源的源码就讲完了。为了对 AQS 的工作原理、节点状态变化有一个更加清晰的认识,接下来会通过画图的方式来了解整个 AQS 的工作原理。
由于 AQS 是底层同步工具,获取和释放资源的方法并没有提供具体实现,因此这里基于 `ReentrantLock` 来画图进行讲解。
2024-12-19 23:06:39 +08:00
假设总共有 3 个线程尝试获取锁,线程分别为 `T1``T2``T3`
2024-12-19 23:01:04 +08:00
2024-12-19 23:06:39 +08:00
此时,假设线程 `T1` 先获取到锁,线程 `T2` 排队等待获取锁。在线程 `T2` 进入队列之前,需要对 AQS 内部队列进行初始化。`head` 节点在初始化后状态为 `0` 。AQS 内部初始化后的队列如下图:
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process.png)
2024-12-19 23:01:04 +08:00
2024-12-19 23:06:39 +08:00
此时,线程 `T2` 尝试获取锁。由于线程 `T1` 持有锁,因此线程 `T2` 会进入队列中等待获取锁。同时会将前继节点( `head` 节点)的状态由 `0` 更新为 `SIGNAL` ,表示需要对 `head` 节点的后继节点进行唤醒。此时AQS 内部队列如下图所示:
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-2.png)
2024-12-19 23:01:04 +08:00
2024-12-19 23:06:39 +08:00
此时,线程 `T3` 尝试获取锁。由于线程 `T1` 持有锁,因此线程 `T3` 会进入队列中等待获取锁。同时会将前继节点(线程 `T2` 节点)的状态由 `0` 更新为 `SIGNAL` ,表示线程 `T2` 节点需要对后继节点进行唤醒。此时AQS 内部队列如下图所示:
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-3.png)
2024-12-19 23:01:04 +08:00
此时,假设线程 `T1` 释放锁,会唤醒后继节点 `T2` 。线程 `T2` 被唤醒后获取到锁,并且会从等待队列中退出。
这里线程 `T2` 节点退出等待队列并不是直接从队列移除,而是令线程 `T2` 节点成为新的 `head` 节点,以此来退出资源获取的等待。此时 AQS 内部队列如下所示:
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-4.png)
2024-12-19 23:01:04 +08:00
2024-12-19 23:06:39 +08:00
此时,假设线程 `T2` 释放锁,会唤醒后继节点 `T3` 。线程 `T3` 获取到锁之后,同样也退出等待队列,即将线程 `T3` 节点变为 `head` 节点来退出资源获取的等待。此时 AQS 内部队列如下所示:
2024-12-19 23:01:04 +08:00
2024-12-20 15:45:13 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/concurrent/aqs-acquire-and-release-process-5.png)
2024-12-19 23:01:04 +08:00
### AQS 资源获取源码分析(共享模式)
AQS 中以共享模式获取资源的入口方法是 `acquireShared()` ,如下:
```JAVA
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
```
`acquireShared()` 方法中,会先尝试获取共享锁,如果获取失败,则将当前线程加入到队列中阻塞,等待唤醒后尝试获取共享锁,分别对应一下两个方法:`tryAcquireShared()``doAcquireShared()`
其中 `tryAcquireShared()` 方法是 AQS 提供的模板方法,由同步器来实现具体逻辑。因此这里以 `Semaphore` 为例,来分析共享模式下,如何获取资源。
#### `tryAcquireShared()` 分析
`Semaphore` 中实现了公平锁和非公平锁,接下来以非公平锁为例来分析 `tryAcquireShared()` 源码。
`Semaphore` 中重写的 `tryAcquireShared()` 方法会调用下边的 `nonfairTryAcquireShared()` 方法:
```JAVA
// Semaphore 重写 AQS 的模板方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Semaphore
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 1、获取可用资源数量。
int available = getState();
// 2、计算剩余资源数量。
int remaining = available - acquires;
// 3、如果剩余资源数量 < 0则说明资源不足直接返回如果 CAS 更新 state 成功,则说明当前线程获取到了共享资源,直接返回。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
```
在共享模式下AQS 中的 `state` 值表示共享资源的数量。
`nonfairTryAcquireShared()` 方法中,会在死循环中不断尝试获取资源,如果 「剩余资源数不足」 或者 「当前线程成功获取资源」 ,就退出死循环。方法返回 **剩余的资源数量** ,根据返回值的不同,分为 3 种情况:
- **剩余资源数量 > 0** :表示成功获取资源,并且后续的线程也可以成功获取资源。
- **剩余资源数量 = 0** :表示成功获取资源,但是后续的线程无法成功获取资源。
- **剩余资源数量 < 0** :表示获取资源失败。
#### `doAcquireShared()` 分析
为了方便阅读,这里再贴一下获取资源的入口方法 `acquireShared()`
```JAVA
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
```
`acquireShared()` 方法中,会先通过 `tryAcquireShared()` 尝试获取资源。
如果发现方法的返回值 `< 0` ,即剩余的资源数小于 0则表明当前线程获取资源失败。因此会进入 `doAcquireShared()` 方法,将当前线程加入到 AQS 队列进行等待。如下:
```JAVA
// AQS
private void doAcquireShared(int arg) {
// 1、将当前线程加入到队列中等待。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 2、如果当前线程是等待队列的第一个节点则尝试获取资源。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 3、将当前线程节点移出等待队列并唤醒后续线程节点。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3、如果获取资源失败就会取消获取资源将节点状态更新为 CANCELLED。
if (failed)
cancelAcquire(node);
}
}
```
由于当前线程已经尝试获取资源失败了,因此在 `doAcquireShared()` 方法中,需要将当前线程封装为 Node 节点,加入到队列中进行等待。
**共享模式** 获取资源和 **独占模式** 获取资源最大的不同之处在于:共享模式下,资源的数量可能会大于 1即可以多个线程同时持有资源。
因此在共享模式下,当线程线程被唤醒之后,获取到了资源,如果发现还存在剩余资源,就会尝试唤醒后边的线程去尝试获取资源。对应的 `setHeadAndPropagate()` 方法如下:
```JAVA
// AQS
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 1、将当前线程节点移出等待队列。
setHead(node);
// 2、唤醒后续等待节点。
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
```
`setHeadAndPropagate()` 方法中,唤醒后续节点需要满足一定的条件,主要需要满足 2 个条件:
- `propagate > 0` `propagate` 代表获取资源之后剩余的资源数量,如果 `> 0` ,则可以唤醒后续线程去获取资源。
- `h.waitStatus < 0` :这里的 `h` 节点是执行 `setHead()` 之前的 `head` 节点。判断 `head.waitStatus` 时使用 `< 0` ,主要为了确定 `head` 节点的状态为 `SIGNAL``PROPAGATE` 。如果 `head` 节点为 `SIGNAL` ,则可以唤醒后续节点;如果 `head` 节点状态为 `PROPAGATE` ,也可以唤醒后续节点(这是为了解决并发场景下出现的问题,后续会细讲)。
代码中关于 **唤醒后续等待节点**`if` 判断稍微复杂一些,这里来讲一下为什么这样写:
```JAVA
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0)
```
- `h == null || h.waitStatus < 0` `h == null` 用于防止空指针异常。正常情况下 h 不会为 `null` ,因为执行到这里之前,当前节点已经加入到队列中了,队列不可能还没有初始化。
`h.waitStatus < 0` 主要判断 `head` 节点的状态是否为 `SIGNAL` 或者 `PROPAGATE` ,直接使用 `< 0` 来判断比较方便。
- `(h = head) == null || h.waitStatus < 0` :如果到这里说明之前判断的 `h.waitStatus < 0` ,说明存在并发。
同时存在其他线程在唤醒后续节点,已经将 `head` 节点的值由 `SIGNAL` 修改为 `0` 了。因此,这里重新获取新的 `head` 节点,这次获取的 `head` 节点为通过 `setHead()` 设置的当前线程节点,之后再次判断 `waitStatus` 状态。
如果 `if` 条件判断通过,就会走到 `doReleaseShared()` 方法唤醒后续等待节点,如下:
```JAVA
private void doReleaseShared() {
for (;;) {
Node h = head;
// 1、队列中至少需要一个等待的线程节点。
if (h != null && h != tail) {
int ws = h.waitStatus;
// 2、如果 head 节点的状态为 SIGNAL则可以唤醒后继节点。
if (ws == Node.SIGNAL) {
// 2.1 清除 head 节点的 SIGNAL 状态,更新为 0。表示已经唤醒该节点的后继节点了。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 2.2 唤醒后继节点
unparkSuccessor(h);
}
// 3、如果 head 节点的状态为 0则更新为 PROPAGATE。这是为了解决并发场景下存在的问题接下来会细讲。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
```
`doReleaseShared()` 方法中,会判断 `head` 节点的 `waitStatus` 状态来决定接下来的操作,有两种情况:
- `head` 节点的状态为 `SIGNAL` :表明 `head` 节点存在后继节点需要唤醒,因此通过 `CAS` 操作将 `head` 节点的 `SIGNAL` 状态更新为 `0` 。通过清除 `SIGNAL` 状态来表示已经对 `head` 节点的后继节点进行唤醒操作了。
- `head` 节点的状态为 `0` :表明存在并发情况,需要将 `0` 修改为 `PROPAGATE` 来保证在并发场景下可以正常唤醒线程。
#### 为什么需要 `PROPAGATE` 状态?
`doReleaseShared()` 释放资源时,第 3 步不太容易理解,即如果发现 `head` 节点的状态是 `0` ,就将 `head` 节点的状态由 `0` 更新为 `PROPAGATE`
AQS 中Node 节点的 `PROPAGATE` 就是为了处理并发场景下可能出现的无法唤醒线程节点的问题。`PROPAGATE` 只在 `doReleaseShared()` 方法中用到一次。
**接下来通过案例分析,为什么需要 `PROPAGATE` 状态?**
在共享模式下,线程获取和释放资源的方法调用链如下:
- 线程获取资源的方法调用链为: `acquireShared() -> tryAcquireShared() -> 线程阻塞等待唤醒 -> tryAcquireShared() -> setHeadAndPropagate() -> if (剩余资源数 > 0) || (head.waitStatus < 0) 则唤醒后续节点`
- 线程释放资源的方法调用链为: `releaseShared() -> tryReleaseShared() -> doReleaseShared()`
**如果在释放资源时,没有将 `head` 节点的状态由 `0` 改为 `PROPAGATE` **
假设总共有 4 个线程尝试以共享模式获取资源,总共有 2 个资源。初始 `T3``T4` 线程获取到了资源,`T1``T2` 线程没有获取到,因此在队列中排队等候。
- 在时刻 1 时,线程 `T1``T2` 在等待队列中,`T3``T4` 持有资源。此时等待队列内节点以及对应状态为(括号内为节点的 `waitStatus` 状态):
`head(-1) -> T1(-1) -> T2(0)`
- 在时刻 2 时,线程 `T3` 释放资源,通过 `doReleaseShared()` 方法将 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T1` ,之后线程 `T3` 退出。
线程 `T1` 被唤醒之后,通过 `tryAcquireShared()` 获取到资源,但是此时还未来得及执行 `setHeadAndPropagate()` 将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(0) -> T1(-1) -> T2(0)`
- 在时刻 3 时,线程 `T4` 释放资源, 由于此时 `head` 节点的状态为 `0` ,因此在 `doReleaseShared()` 方法中无法唤醒 `head` 的后继节点, 之后线程 `T4` 退出。
- 在时刻 4 时,线程 `T1` 继续执行 `setHeadAndPropagate()` 方法将自己设置为 `head` 节点。
但是此时由于线程 `T1` 执行 `tryAcquireShared()` 方法返回的剩余资源数为 `0` ,并且 `head` 节点的状态为 `0` ,因此线程 `T1` 并不会在 `setHeadAndPropagate()` 方法中唤醒后续节点。此时等待队列内节点状态为:
`head(-1线程 T1 节点) -> T2(0)`
此时,就导致线程 `T2` 节点在等待队列中,无法被唤醒。对应时刻表如下:
| 时刻 | 线程 T1 | 线程 T2 | 线程 T3 | 线程 T4 | 等待队列 |
| ------ | -------------------------------------------------------------- | -------- | ---------------- | ------------------------------------------------------------- | --------------------------------- |
| 时刻 1 | 等待队列 | 等待队列 | 持有资源 | 持有资源 | `head(-1) -> T1(-1) -> T2(0)` |
| 时刻 2 | (执行)被唤醒后,获取资源,但未来得及将自己设置为 `head` 节点 | 等待队列 | (执行)释放资源 | 持有资源 | `head(0) -> T1(-1) -> T2(0)` |
| 时刻 3 | | 等待队列 | 已退出 | (执行)释放资源。但 `head` 节点状态为 `0` ,无法唤醒后继节点 | `head(0) -> T1(-1) -> T2(0)` |
| 时刻 4 | (执行)将自己设置为 `head` 节点 | 等待队列 | 已退出 | 已退出 | `head(-1线程 T1 节点) -> T2(0)` |
**如果在线程释放资源时,将 `head` 节点的状态由 `0` 改为 `PROPAGATE` ,则可以解决上边出现的并发问题,如下:**
- 在时刻 1 时,线程 `T1``T2` 在等待队列中,`T3``T4` 持有资源。此时等待队列内节点以及对应状态为:
`head(-1) -> T1(-1) -> T2(0)`
- 在时刻 2 时,线程 `T3` 释放资源,通过 `doReleaseShared()` 方法将 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T1` ,之后线程 `T3` 退出。
线程 `T1` 被唤醒之后,通过 `tryAcquireShared()` 获取到资源,但是此时还未来得及执行 `setHeadAndPropagate()` 将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(0) -> T1(-1) -> T2(0)`
- 在时刻 3 时,线程 `T4` 释放资源, 由于此时 `head` 节点的状态为 `0` ,因此在 `doReleaseShared()` 方法中会将 `head` 节点的状态由 `0` 更新为 `PROPAGATE` 之后线程 `T4` 退出。此时等待队列内节点状态为:
`head(PROPAGATE) -> T1(-1) -> T2(0)`
- 在时刻 4 时,线程 `T1` 继续执行 `setHeadAndPropagate()` 方法将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(-1线程 T1 节点) -> T2(0)`
- 在时刻 5 时,虽然此时由于线程 `T1` 执行 `tryAcquireShared()` 方法返回的剩余资源数为 `0` ,但是 `head` 节点状态为 `PROPAGATE < 0` (这里的 `head` 节点是老的 `head` 节点,而不是刚成为 `head` 节点的线程 `T1` 节点)。
因此线程 `T1` 会在 `setHeadAndPropagate()` 方法中唤醒后续 `T2` 节点,并将 `head` 节点的状态由 `SIGNAL` 更新为 `0`。此时等待队列内节点状态为:
`head(0线程 T1 节点) -> T2(0)`
- 在时刻 6 时,线程 `T2` 被唤醒后,获取到资源,并将自己设置为 `head` 节点。此时等待队列内节点状态为:
`head(0线程 T2 节点)`
有了 `PROPAGATE` 状态,就可以避免线程 `T2` 无法被唤醒的情况。对应时刻表如下:
2024-12-22 00:19:07 +08:00
| 时刻 | 线程 T1 | 线程 T2 | 线程 T3 | 线程 T4 | 等待队列 |
| ------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ---------------- | ------------------------------------------------------------------- | ------------------------------------ |
| 时刻 1 | 等待队列 | 等待队列 | 持有资源 | 持有资源 | `head(-1) -> T1(-1) -> T2(0)` |
| 时刻 2 | (执行)被唤醒后,获取资源,但未来得及将自己设置为 `head` 节点 | 等待队列 | (执行)释放资源 | 持有资源 | `head(0) -> T1(-1) -> T2(0)` |
| 时刻 3 | 未继续向下执行 | 等待队列 | 已退出 | (执行)释放资源。此时会将 `head` 节点状态由 `0` 更新为 `PROPAGATE` | `head(PROPAGATE) -> T1(-1) -> T2(0)` |
| 时刻 4 | (执行)将自己设置为 `head` 节点 | 等待队列 | 已退出 | 已退出 | `head(-1线程 T1 节点) -> T2(0)` |
| 时刻 5 | (执行)由于 `head` 节点状态为 `PROPAGATE < 0` ,因此会在 `setHeadAndPropagate()` 方法中唤醒后续节点,此时将新的 `head` 节点的状态由 `SIGNAL` 更新为 `0` ,并唤醒线程 `T2` | 等待队列 | 已退出 | 已退出 | `head(0线程 T1 节点) -> T2(0)` |
| 时刻 6 | 已退出 | (执行)线程 `T2` 被唤醒后,获取到资源,并将自己设置为 `head` 节点 | 已退出 | 已退出 | `head(0线程 T2 节点)` |
### AQS 资源释放源码分析(共享模式)
AQS 中以共享模式释放资源的入口方法是 `releaseShared()` ,代码如下:
```JAVA
// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
```
其中 `tryReleaseShared()` 方法是 AQS 提供的模板方法,这里同样以 `Semaphore` 来讲解,如下:
```JAVA
// Semaphore
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
```
`Semaphore` 实现的 `tryReleaseShared()` 方法中,会在死循环内不断尝试释放资源,即通过 `CAS` 操作来更新 `state` 值。
如果更新成功,则证明资源释放成功,会进入到 `doReleaseShared()` 方法。
2024-12-22 00:19:07 +08:00
`doReleaseShared()` 方法在前文获取资源(共享模式)的部分已进行了详细的源码分析,此处不再重复。
### Condition 条件队列的工作机制
前面在 `waitStatus` 状态表格中提到过 `CONDITION`(值为 -2状态表示节点在 Condition 条件队列中等待。这里系统讲解 Condition 条件队列的工作机制。
#### 什么是 Condition
`Condition``java.util.concurrent.locks` 包中定义的接口,它提供了类似于 `Object.wait()` / `Object.notify()` 的线程等待/通知机制,但功能更加强大和灵活。`Condition` 必须与 `Lock` 配合使用,就像 `wait/notify` 必须与 `synchronized` 配合使用一样。
`Object``wait/notify` 相比,`Condition` 的主要优势在于:
- **支持多个等待队列**:一个 `Lock` 可以创建多个 `Condition` 实例,不同的线程可以在不同的条件上等待,实现更精细的线程协作。而 `synchronized` 只有一个等待队列。
- **支持不响应中断的等待**`Condition` 提供了 `awaitUninterruptibly()` 方法。
- **支持超时等待**`Condition` 提供了 `awaitNanos(long)``await(long, TimeUnit)` 方法,可以设定等待的截止时间。
#### AQS 中的两种队列
在 AQS 内部实际上维护了 **两种队列**
1. **同步队列CLH 变体队列)**:就是前面详细分析过的双向队列,用于存放获取资源失败而等待的线程节点。
2. **条件队列Condition Queue**:是一个单向链表,用于存放调用了 `Condition.await()` 方法而等待的线程节点。每个 `Condition` 实例维护一个独立的条件队列。
条件队列中的节点使用 `Node``nextWaiter` 指针来链接下一个节点,形成单向链表。条件队列的头节点为 `firstWaiter`,尾节点为 `lastWaiter`
#### Condition 的核心工作流程
AQS 的内部类 `ConditionObject` 实现了 `Condition` 接口,其核心方法为 `await()``signal()`
**`await()` 方法的工作流程:**
1. 将当前线程封装为 `Node` 节点(`waitStatus` 设置为 `CONDITION`),加入到条件队列的尾部。
2. 完全释放当前线程持有的锁(即将 `state` 值置为 0并保存释放前的 `state` 值。
3. 阻塞当前线程,等待被 `signal()` 唤醒或被中断。
4. 被唤醒后,重新通过 `acquireQueued()` 进入同步队列竞争锁,并恢复之前保存的 `state` 值(重入次数)。
**`signal()` 方法的工作流程:**
1. 检查调用 `signal()` 的线程是否持有锁(不持有则抛出 `IllegalMonitorStateException`)。
2. 将条件队列中第一个等待的节点从条件队列移除。
3. 将该节点的 `waitStatus``CONDITION` 修改为 `0`,并通过 `enq()` 方法将其加入到同步队列的尾部。
4. 如果同步队列中前驱节点的状态异常(`CANCELLED`)或者 CAS 设置前驱节点状态为 `SIGNAL` 失败,则直接唤醒该线程。
`signalAll()` 方法与 `signal()` 类似,区别在于它会将条件队列中的 **所有** 节点都转移到同步队列中。
下面的代码示例展示了 `Condition` 的典型用法——实现一个简单的有界阻塞队列:
```java
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class SimpleBlockingQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
// 两个不同的条件队列:分别用于"队列不满"和"队列不空"
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public SimpleBlockingQueue(int capacity) {
this.capacity = capacity;
}
/**
* 向队列中添加元素,如果队列已满则等待。
*/
public void put(T item) throws InterruptedException {
lock.lock();
try {
// 队列满时,在 notFull 条件上等待
while (queue.size() == capacity) {
notFull.await();
}
queue.offer(item);
// 添加元素后,通知在 notEmpty 条件上等待的消费者线程
notEmpty.signal();
} finally {
lock.unlock();
}
}
/**
* 从队列中取出元素,如果队列为空则等待。
*/
public T take() throws InterruptedException {
lock.lock();
try {
// 队列空时,在 notEmpty 条件上等待
while (queue.isEmpty()) {
notEmpty.await();
}
T item = queue.poll();
// 取出元素后,通知在 notFull 条件上等待的生产者线程
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
SimpleBlockingQueue<Integer> blockingQueue = new SimpleBlockingQueue<>(5);
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
blockingQueue.put(i);
System.out.println("生产: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int item = blockingQueue.take();
System.out.println("消费: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
producer.start();
consumer.start();
}
}
```
在上面的例子中,`notFull``notEmpty` 是两个独立的 `Condition` 实例,分别维护各自的条件队列。生产者在队列满时在 `notFull` 上等待,消费者在队列空时在 `notEmpty` 上等待。这种分离等待条件的设计,避免了不必要的线程唤醒,比 `synchronized` + `wait/notifyAll` 更加高效。
#### `await()` 核心源码分析
```java
// AQS 内部类 ConditionObject
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1、将当前线程封装为 Node 节点,加入条件队列
Node node = addConditionWaiter();
// 2、完全释放锁并保存释放前的 state 值
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3、如果节点不在同步队列中则阻塞当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4、被唤醒后重新进入同步队列竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
```
`await()` 方法中有两个关键操作:
- `fullyRelease(node)`:完全释放锁(而不是只释放一次),这样即使线程重入了多次锁,也能在等待期间让其他线程获取到锁。被唤醒后会通过 `acquireQueued(node, savedState)` 恢复之前的重入次数。
- `isOnSyncQueue(node)`:判断节点是否已经被转移到同步队列。当其他线程调用 `signal()` 时,节点会从条件队列转移到同步队列,此时 `isOnSyncQueue()` 返回 `true`,线程退出 `while` 循环,开始竞争锁。
### 公平锁与非公平锁的性能差异分析
前面的源码分析中,以 `ReentrantLock` 的非公平锁为例讲解了 `tryAcquire()` 的实现。实际上 `ReentrantLock` 同时支持公平锁和非公平锁两种模式。这里深入分析二者的实现差异及其对性能的影响。
#### 源码层面的差异
`ReentrantLock` 默认使用非公平锁,通过构造参数可以切换为公平锁:
```java
// 非公平锁(默认)
ReentrantLock unfairLock = new ReentrantLock();
// 公平锁
ReentrantLock fairLock = new ReentrantLock(true);
```
二者的核心差异在于 `tryAcquire()` 方法的实现。非公平锁的 `nonfairTryAcquire()` 前面已经分析过,下面看公平锁的实现:
```java
// ReentrantLock.FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 关键差异:先调用 hasQueuedPredecessors() 判断同步队列中是否有等待更久的线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
```
**唯一的区别** 就是公平锁在 CAS 修改 `state` 之前多了一个 `hasQueuedPredecessors()` 判断:
```java
// AQS
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
```
这个方法用于判断当前线程之前是否有其他线程在排队。如果有,则当前线程不能直接获取锁,必须排队等待,从而保证了 **FIFO** 的公平性。
而非公平锁没有这个判断,当锁刚好释放时,新来的线程可以直接通过 CAS 抢到锁,即使同步队列中已经有其他线程在等待。
#### 性能差异对比
| 对比维度 | 非公平锁(默认) | 公平锁 |
| --- | --- | --- |
| **吞吐量** | 更高。新线程有机会直接获取锁,减少了线程上下文切换 | 较低。所有线程都必须排队,增加了上下文切换的开销 |
| **线程饥饿** | 可能发生。极端情况下某些线程长时间无法获取锁 | 不会发生。严格按照请求顺序分配锁 |
| **上下文切换** | 较少。持有锁的线程释放锁后,新到达的线程可能直接获取锁,不需要唤醒队列中的线程 | 较多。每次释放锁都需要唤醒队列中的下一个线程 |
| **适用场景** | 大多数场景(对响应时间和吞吐量要求较高) | 对公平性有严格要求的场景(如资源分配、任务调度) |
**为什么非公平锁性能通常更好?**
关键原因在于 **减少了线程上下文切换的次数**。当持有锁的线程 A 释放锁后:
- **非公平锁**:此时如果恰好有线程 B 正在尝试获取锁(还没有进入同步队列),线程 B 可以直接通过 CAS 获取到锁并立即执行,省去了唤醒队列中线程的开销。而队列中等待的线程被唤醒后发现锁被占用,会重新阻塞,虽然看起来"浪费"了一次唤醒,但总体上减少了线程切换次数。
- **公平锁**:线程 B 必须排到队列尾部,然后唤醒队列头部的线程。从线程被唤醒到真正开始执行之间,存在一段 **调度延迟**(线程状态从阻塞切换到运行),在这段延迟期间锁处于空闲状态,降低了锁的利用率。
Doug Lea 在 `ReentrantLock` 的文档中指出:使用公平锁的程序在多线程环境下的总体吞吐量通常低于使用非公平锁的程序(即更慢),因此 `ReentrantLock` 默认使用非公平模式。但在需要保证请求处理顺序或避免线程饥饿的场景中(如连接池分配),公平锁是更好的选择。
下面通过代码示例来演示公平锁与非公平锁在行为上的差异:
```java
import java.util.concurrent.locks.ReentrantLock;
public class FairVsUnfairLockDemo {
// 分别测试公平锁和非公平锁
private static void testLock(ReentrantLock lock, String lockType) {
System.out.println("=== " + lockType + " ===");
Runnable task = () -> {
for (int i = 0; i < 2; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取到锁");
} finally {
lock.unlock();
}
}
};
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(task, lockType + "-线程-" + i);
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
try { t.join(); } catch (InterruptedException e) { }
}
System.out.println();
}
public static void main(String[] args) {
// 非公平锁:同一个线程可能连续多次获取到锁
testLock(new ReentrantLock(false), "非公平锁");
// 公平锁:线程按请求顺序交替获取锁
testLock(new ReentrantLock(true), "公平锁");
}
}
```
运行上面的代码可以观察到:非公平锁模式下,同一个线程可能连续多次获取到锁(因为它释放锁后立即又去竞争,有很大概率在队列中的线程被唤醒之前就抢到了锁);而公平锁模式下,线程获取锁的顺序更加均匀,不会出现某个线程连续霸占锁的情况。
## 常见同步工具类
### Semaphore(信号量)
#### 介绍
`synchronized``ReentrantLock` 都是一次只允许一个线程访问某个资源,而`Semaphore`(信号量)可以用来控制同时访问特定资源的线程数量。
2023-07-13 09:08:59 +08:00
`Semaphore` 的使用简单,我们这里假设有 `N(N>5)` 个线程来获取 `Semaphore` 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。
2023-07-13 09:08:59 +08:00
```java
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();
```
2023-04-28 17:31:44 +08:00
当初始的资源个数为 1 的时候,`Semaphore` 退化为排他锁。
`Semaphore` 有两种模式:。
- **公平模式:** 调用 `acquire()` 方法的顺序就是获取许可证的顺序,遵循 FIFO
- **非公平模式:** 抢占式的。
`Semaphore` 对应的两个构造方法如下:
```java
public Semaphore(int permits) {
2023-12-30 17:14:13 +08:00
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
2023-12-30 17:14:13 +08:00
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
```
2022-02-07 19:18:50 +08:00
**这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。**
`Semaphore` 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。
#### 原理
2021-07-26 15:58:01 +08:00
`Semaphore` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `permits`,你可以将 `permits` 的值理解为许可证的数量,只有拿到许可证的线程才能执行。
2023-07-13 09:08:59 +08:00
以无参 `acquire` 方法为例,调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state > 0` 的话,则表示可以获取成功,如果 `state <= 0` 的话,则表示许可证数量不足,获取失败。
2023-07-13 09:08:59 +08:00
如果可以获取成功的话(`state > 0` ),会尝试使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。
```java
2023-07-13 09:08:59 +08:00
// 获取1个许可证
public void acquire() throws InterruptedException {
2023-12-30 17:14:13 +08:00
sync.acquireSharedInterruptibly(1);
}
2023-07-13 09:08:59 +08:00
// 获取一个或者多个许可证
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
```
`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
2023-07-13 09:08:59 +08:00
// 尝试获取许可证arg为获取许可证个数当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
2023-07-13 09:08:59 +08:00
```
这里再以非公平模式(`NonfairSync`)的为例,看看 `tryAcquireShared` 方法的实现。
```java
// 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
2023-07-13 09:08:59 +08:00
// 非公平的共享模式获取许可证
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 当前可用许可证数量
int available = getState();
/*
* 尝试获取许可证当前可用许可证数量小于等于0时返回负值表示获取失败
* 当前可用许可证大于0时才可能获取成功CAS失败了会循环重新获取最新的值尝试获取
*/
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
```
2023-07-13 09:08:59 +08:00
以无参 `release` 方法为例,调用`semaphore.release();` ,线程尝试释放许可证,并使用 CAS 操作去修改 `state` 的值 `state=state+1`。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 `state` 的值 `state=state-1` ,如果 `state > 0` 则获取令牌成功,否则重新进入等待队列,挂起线程。
```java
// 释放一个许可证
public void release() {
2023-12-30 17:14:13 +08:00
sync.releaseShared(1);
}
2023-07-13 09:08:59 +08:00
// 释放一个或者多个许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
```
`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 释放共享锁
// 如果 tryReleaseShared 返回 true就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
2023-07-13 09:08:59 +08:00
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}
2023-07-13 09:08:59 +08:00
```
`tryReleaseShared` 方法是`Semaphore` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。
```java
// 内部类 Sync 中重写的一个方法
// 尝试释放资源
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
2023-07-13 09:08:59 +08:00
// 可用许可证+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
2023-07-13 09:08:59 +08:00
// CAS修改state的值
if (compareAndSetState(current, next))
return true;
}
}
```
2020-03-13 18:23:29 +08:00
2023-07-13 09:08:59 +08:00
可以看到,上面提到的几个方法底层基本都是通过同步器 `sync` 实现的。`Sync``CountDownLatch` 的内部类 , 继承了 `AbstractQueuedSynchronizer` 重写了其中的某些方法。并且Sync 对应的还有两个子类 `NonfairSync`(对应非公平模式) 和 `FairSync`(对应公平模式)。
```java
private static final class Sync extends AbstractQueuedSynchronizer {
// ...
}
static final class NonfairSync extends Sync {
// ...
}
static final class FairSync extends Sync {
// ...
}
```
#### 实战
```java
2023-07-13 09:08:59 +08:00
public class SemaphoreExample {
2018-12-12 21:45:56 +08:00
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 初始许可证数量
2018-12-12 21:45:56 +08:00
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
semaphore.acquire();// 获取一个许可所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release();// 释放一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
```
执行 `acquire()` 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 `release` 方法增加一个许可证,这可能会释放一个阻塞的 `acquire()` 方法。然而,其实并没有实际的许可证这个对象,`Semaphore` 只是维持了一个可获得许可证的数量。 `Semaphore` 经常用于限制获取某种资源的线程数量。
当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:
```java
2020-03-13 18:23:29 +08:00
semaphore.acquire(5);// 获取5个许可所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 释放5个许可
```
除了 `acquire()` 方法之外,另一个比较常用的与之对应的方法是 `tryAcquire()` 方法,该方法如果获取不到许可就立即返回 false。
2023-05-05 12:39:01 +08:00
[issue645 补充内容](https://github.com/Snailclimb/JavaGuide/issues/645)
2019-12-16 17:27:56 +08:00
2024-11-18 16:52:47 +08:00
> `Semaphore` 基于 AQS 实现,用于控制并发访问的线程数量,但它与共享锁的概念有所不同。`Semaphore` 的构造函数使用 `permits` 参数初始化 AQS 的 `state` 变量,该变量表示可用的许可数量。当线程调用 `acquire()` 方法尝试获取许可时,`state` 会原子性地减 1。如果 `state` 减 1 后大于等于 0则 `acquire()` 成功返回,线程可以继续执行。如果 `state` 减 1 后小于 0表示当前并发访问的线程数量已达到 `permits` 的限制,该线程会被放入 AQS 的等待队列并阻塞,**而不是自旋等待**。当其他线程完成任务并调用 `release()` 方法时,`state` 会原子性地加 1。`release()` 操作会唤醒 AQS 等待队列中的一个或多个阻塞线程。这些被唤醒的线程将再次尝试 `acquire()` 操作,竞争获取可用的许可。因此,`Semaphore` 通过控制许可数量来限制并发访问的线程数量,而不是通过自旋和共享锁机制。
### CountDownLatch (倒计时器)
#### 介绍
`CountDownLatch` 允许 `count` 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
`CountDownLatch` 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 `CountDownLatch` 使用完毕后,它不能再次被使用。
#### 原理
2023-07-13 09:08:59 +08:00
`CountDownLatch` 是共享锁的一种实现,它默认构造 AQS 的 `state` 值为 `count`。这个我们通过 `CountDownLatch` 的构造方法即可看出。
```java
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
//...
}
```
当线程调用 `countDown()` 时,其实使用了`tryReleaseShared`方法以 CAS 的操作来减少 `state`,直至 `state` 为 0 。当 `state` 为 0 时,表示所有的线程都调用了 `countDown` 方法,那么在 `CountDownLatch` 上等待的线程就会被唤醒并继续执行。
```java
public void countDown() {
// Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer
sync.releaseShared(1);
}
```
`releaseShared`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 释放共享锁
// 如果 tryReleaseShared 返回 true就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}
```
`tryReleaseShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法, `AbstractQueuedSynchronizer`中的默认实现仅仅抛出 `UnsupportedOperationException` 异常。
```java
// 对 state 进行递减,直到 state 变成 0
// 只有 count 递减到 0 时countDown 才会返回 true
protected boolean tryReleaseShared(int releases) {
// 自选检查 state 是否为 0
for (;;) {
int c = getState();
// 如果 state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
// CAS 操作更新 state 的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
```
2024-12-20 15:45:13 +08:00
以无参 `await`方法为例,当调用 `await()` 的时候,如果 `state` 不为 0那就证明任务还没有执行完毕`await()` 就会一直阻塞,也就是说 `await()` 之后的语句不会被执行(`main` 线程被加入到等待队列也就是 变体 CLH 队列中了)。然后,`CountDownLatch` 会自旋 CAS 判断 `state == 0`,如果 `state == 0` 的话,就会释放所有等待的线程,`await()` 方法之后的语句得到执行。
2023-07-13 09:08:59 +08:00
```java
// 等待(也可以叫做加锁)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
```
`acquireSharedInterruptibly`方法是 `AbstractQueuedSynchronizer` 中的默认实现。
```java
// 尝试获取锁,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获得锁,获取成功则返回
if (tryAcquireShared(arg) < 0)
// 获取失败加入等待队列,挂起线程
doAcquireSharedInterruptibly(arg);
}
```
`tryAcquireShared` 方法是`CountDownLatch` 的内部类 `Sync` 重写的一个方法,其作用就是判断 `state` 的值是否为 0是的话就返回 1否则返回 -1。
```java
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
```
#### 实战
2018-12-11 20:55:22 +08:00
2023-05-05 12:39:01 +08:00
**CountDownLatch 的两种典型用法**
1. 某一线程在开始运行前等待 n 个线程执行完毕 : 将 `CountDownLatch` 的计数器初始化为 n `new CountDownLatch(n)`),每当一个任务线程执行完毕,就将计数器减 1 `countdownlatch.countDown()`),当计数器的值变为 0 时,在 `CountDownLatch 上 await()` 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2023-05-05 12:39:01 +08:00
2. 实现多个线程开始执行任务的最大并行性:注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 `CountDownLatch` 对象,将其计数器初始化为 1 `new CountDownLatch(1)`),多个线程在开始执行任务前首先 `coundownlatch.await()`,当主线程调用 `countDown()` 时,计数器变为 0多个线程同时被唤醒。
2023-05-05 12:39:01 +08:00
**CountDownLatch 代码示例**
```java
2023-07-13 09:08:59 +08:00
public class CountDownLatchExample {
2018-12-12 21:45:56 +08:00
// 请求的数量
2023-07-13 09:08:59 +08:00
private static final int THREAD_COUNT = 550;
2018-12-12 21:45:56 +08:00
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
2023-07-13 09:08:59 +08:00
// 只是测试使用,实际场景请手动赋值线程池参数
2018-12-12 21:45:56 +08:00
ExecutorService threadPool = Executors.newFixedThreadPool(300);
2023-07-13 09:08:59 +08:00
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
threadPool.execute(() -> {
2018-12-12 21:45:56 +08:00
try {
2023-07-13 09:08:59 +08:00
test(threadNum);
2018-12-12 21:45:56 +08:00
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
2023-07-13 09:08:59 +08:00
// 表示一个请求已经被完成
countDownLatch.countDown();
2018-12-12 21:45:56 +08:00
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
2023-07-13 09:08:59 +08:00
Thread.sleep(1000);
System.out.println("threadNum:" + threadnum);
Thread.sleep(1000);
2018-12-12 21:45:56 +08:00
}
}
```
2019-12-16 17:27:56 +08:00
上面的代码中,我们定义了请求的数量为 550当这 550 个请求被处理完成之后,才会执行`System.out.println("finish");`
2018-12-11 20:55:22 +08:00
`CountDownLatch` 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 `CountDownLatch.await()` 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
2019-12-16 17:27:56 +08:00
2020-03-21 20:04:33 +08:00
其他 N 个线程必须引用闭锁对象,因为他们需要通知 `CountDownLatch` 对象,他们已经完成了各自的任务。这种通知机制是通过 `CountDownLatch.countDown()`方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法count 的值等于 0然后主线程就能通过 `await()`方法,恢复执行自己的任务。
再插一嘴:`CountDownLatch``await()` 方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:
2020-03-21 20:04:33 +08:00
```java
for (int i = 0; i < threadCount-1; i++) {
.......
}
```
这样就导致 `count` 的值没办法等于 0然后就会导致一直等待。
2018-12-11 20:55:22 +08:00
### CyclicBarrier(循环栅栏)
#### 介绍
`CyclicBarrier``CountDownLatch` 非常类似,它也可以实现线程间的技术等待,但是它的功能比 `CountDownLatch` 更加复杂和强大。主要应用场景和 `CountDownLatch` 类似。
2025-02-24 15:21:24 +08:00
> `CountDownLatch` 的实现是基于 AQS 的,而 `CyclicBarrier` 是基于 `ReentrantLock`(`ReentrantLock` 也属于 AQS 同步器)和 `Condition` 的。
2020-03-13 18:23:29 +08:00
`CyclicBarrier` 的字面意思是可循环使用Cyclic的屏障Barrier。它要做的事情是让一组线程到达一个屏障也可以叫同步点时被阻塞直到最后一个线程到达屏障时屏障才会开门所有被屏障拦截的线程才会继续干活。
#### 原理
2019-12-16 17:27:56 +08:00
`CyclicBarrier` 内部通过一个 `count` 变量作为计数器,`count` 的初始值为 `parties` 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
```java
//每次拦截的线程数
private final int parties;
//计数器
private int count;
```
下面我们结合源码来简单看看。
1、`CyclicBarrier` 默认的构造方法是 `CyclicBarrier(int parties)`,其参数表示屏障拦截的线程数量,每个线程调用 `await()` 方法告诉 `CyclicBarrier` 我已经到达了屏障,然后当前线程被阻塞。
2019-12-16 17:27:56 +08:00
```java
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
```
其中,`parties` 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
2、当调用 `CyclicBarrier` 对象调用 `await()` 方法时,实际上调用的是 `dowait(false, 0L)`方法。 `await()` 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 `parties` 的值时,栅栏才会打开,线程才得以通过执行。
```java
public int await() throws InterruptedException, BrokenBarrierException {
try {
2023-12-30 17:14:13 +08:00
return dowait(false, 0L);
} catch (TimeoutException toe) {
2023-12-30 17:14:13 +08:00
throw new Error(toe); // cannot happen
}
}
```
`dowait(false, 0L)`方法源码分析如下:
```java
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
2023-07-24 16:31:58 +08:00
// count 减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
```
#### 实战
2019-12-16 17:27:56 +08:00
示例 1
```java
public class CyclicBarrierExample1 {
2018-12-12 21:45:56 +08:00
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
2019-12-16 17:27:56 +08:00
/**等待60秒保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
2018-12-12 21:45:56 +08:00
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
```
运行结果,如下:
2023-10-08 16:33:50 +08:00
```plain
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
```
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, `await()` 方法之后的方法才被执行。
2019-12-16 17:27:56 +08:00
另外,`CyclicBarrier` 还提供一个更高级的构造函数 `CyclicBarrier(int parties, Runnable barrierAction)`,用于在线程到达屏障时,优先执行 `barrierAction`,方便处理更复杂的业务场景。
示例 2
```java
public class CyclicBarrierExample2 {
2018-12-12 21:45:56 +08:00
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行------");
});
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}
}
```
运行结果,如下:
2023-10-08 16:33:50 +08:00
```plain
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......
```
2023-07-05 16:21:23 +08:00
## 参考
- Java 并发之 AQS 详解:<https://www.cnblogs.com/waterystone/p/4920797.html>
- 从 ReentrantLock 的实现看 AQS 的原理及应用:<https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html>
2023-10-27 06:44:02 +08:00
<!-- @include: @article-footer.snippet.md -->
````