www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當前位置:首頁 > > 架構師社區(qū)
[導讀]AQS是一個用來構建鎖和同步器的框架,Lock包中的各種鎖,concurrent包中的各種同步器都是基于AQS來構建。所以,理解AQS的實現(xiàn)原理至關重要!

前言

AQS( AbstractQueuedSynchronizer )是一個用來構建鎖和同步器(所謂同步,是指線程之間的通信、協(xié)作)的框架,Lock 包中的各種鎖(如常見的 ReentrantLock, ReadWriteLock), concurrent?包中的各種同步器(如 CountDownLatch, Semaphore, CyclicBarrier)都是基于 AQS 來構建,所以理解 AQS 的實現(xiàn)原理至關重要,AQS 也是面試中區(qū)分侯選人的常見考點,我們務必要掌握,本文將用循序漸近地介紹 AQS,相信大家看完一定有收獲。文章目錄如下

  1. 鎖原理 - 信號量 vs 管程
  2. AQS 實現(xiàn)原理
  3. AQS 源碼剖析
  4. 如何利用 AQS 自定義一個互斥鎖

鎖原理 - 信號量 vs 管程

在并發(fā)編程領域,有兩大核心問題:互斥同步,互斥即同一時刻只允許一個線程訪問共享資源,同步,即線程之間如何通信、協(xié)作,一般這兩大問題可以通過信號量管程來解決。

信號量

信號量(Semaphore)是操作系統(tǒng)提供的一種進程間常見的通信方式,主要用來協(xié)調(diào)并發(fā)程序?qū)蚕碣Y源的訪問,操作系統(tǒng)可以保證對信號量操作的原子性。它是怎么實現(xiàn)的呢。

  • 信號量由一個共享整型變量 S 和兩個原子操作 PV 組成,S 只能通過 P 和 V 操作來改變
  • P 操作:即請求資源,意味著 S 要減 1,如果 S < ?0, 則表示沒有資源了,此時線程要進入等待隊列(同步隊列)等待
  • V 操作: ?即釋放資源,意味著 S 要加 1, 如果 S 小于等于 0,說明等待隊列里有線程,此時就需要喚醒線程。

示意圖如下1.5w字,30圖帶你徹底掌握AQS!

信號量機制的引入解決了進程同步和互斥問題,但信號量的大量同步操作分散在各個進程中不便于管理,還有可能導致系統(tǒng)死鎖。如:生產(chǎn)者消費者問題中將P、V顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號量就越多,需要更加謹慎地處理信號量之間的處理順序,否則很容易造成死鎖現(xiàn)象。

基于信號量給編程帶來的隱患,于是有了提出了對開發(fā)者更加友好的并發(fā)編程模型-管程

管程

Dijkstra 于 1971 年提出:把所有進程對某一種臨界資源的同步操作都集中起來,構成一個所謂的秘書進程。凡要訪問該臨界資源的進程,都需先報告秘書,由秘書來實現(xiàn)諸進程對同一臨界資源的互斥使用,這種機制就是管程。

管程是一種在信號量機制上進行改進的并發(fā)編程模型,解決了信號量在臨界區(qū)的 PV 操作上配對的麻煩,把配對的 PV 操作集中在一起而形成的并發(fā)編程方法理論,極大降低了使用和理解成本。

管程由四部分組成:

  1. 管程內(nèi)部的共享變量。
  2. 管程內(nèi)部的條件變量。
  3. 管程內(nèi)部并行執(zhí)行的進程。
  4. 對于局部與管程內(nèi)部的共享數(shù)據(jù)設置初始值的語句。

由此可見,管程就是一個對象監(jiān)視器。任何線程想要訪問該資源(共享變量),就要排隊進入監(jiān)控范圍。進入之后,接受檢查,不符合條件,則要繼續(xù)等待,直到被通知,然后繼續(xù)進入監(jiān)視器。

需要注意的事,信號量和管程兩者是等價的,信號量可以實現(xiàn)管程,管程也可以實現(xiàn)信號量,只是兩者的表現(xiàn)形式不同而已,管程對開發(fā)者更加友好。

兩者的區(qū)別如下1.5w字,30圖帶你徹底掌握AQS!

管程為了解決信號量在臨界區(qū)的 PV 操作上的配對的麻煩,把配對的 PV 操作集中在一起,并且加入了條件變量的概念,使得在多條件下線程間的同步實現(xiàn)變得更加簡單。

怎么理解管程中的入口等待隊列,共享變量,條件變量等概念,有時候技術上的概念較難理解,我們可以借助生活中的場景來幫助我們理解,就以我們的就醫(yī)場景為例來簡單說明一下,正常的就醫(yī)流程如下:

  1. 病人去掛號后,去侯診室等待叫號
  2. 叫到自己時,就可以進入就診室就診了
  3. 就診時,有兩種情況,一種是醫(yī)生很快就確定病人的病,并作出診斷,診斷完成后,就通知下一位病人進來就診,一種是醫(yī)生無法確定病因,需要病人去做個驗血 / CT 檢查才能確定病情,于是病人就先去驗個血 / ?CT
  4. 病人驗完血 / 做完 CT 后,重新取號,等待叫號(進入入口等待隊列)
  5. 病人等到自己的號,病人又重新拿著驗血 / CT 報告去找醫(yī)生就診

整個流程如下

1.5w字,30圖帶你徹底掌握AQS!

那么管程是如何解決互斥同步的呢

首先來看互斥,上文中醫(yī)生即共享資源(也即共享變量),就診室即為臨界區(qū),病人即線程,任何病人如果想要訪問臨界區(qū),必須首先獲取共享資源(即醫(yī)生),入口一次只允許一個線程經(jīng)過,在共享資源被占有的情況下,如果再有線程想占有共享資源,就需要到等待隊列去等候,等到獲取共享資源的線程釋放資源后,等待隊列中的線程就可以去競爭共享資源了,這樣就解決了互斥問題,所以本質(zhì)上管程是通過將共享資源及其對共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。

再來看同步,同步是通過文中的條件變量及其等待隊列實現(xiàn)的,同步的實現(xiàn)分兩種情況

  1. 病人進入就診室后,無需做驗血 / CT 等操作,于是醫(yī)生診斷完成后,就會釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊列的下一個病人,下一個病人聽到叫號后就能看醫(yī)生了。
  2. 如果病人進入就診室后需要做驗血 / CT 等操作,會去驗血 / CT 隊列(條件隊列)排隊, 同時釋放共享變量(醫(yī)生),通知入口等待隊列的其他病人(線程)去獲取共享變量(醫(yī)生),獲得許可的線程執(zhí)行完臨界區(qū)的邏輯后會喚醒條件變量等待隊列中的線程,將它放到入口等待隊列中 ,等到其獲取共享變量(醫(yī)生)時,即可進入入口(臨界區(qū))處理。

在 Java 里,鎖大多是依賴于管程來實現(xiàn)的,以大家熟悉的內(nèi)置鎖 synchronized 為例,它的實現(xiàn)原理如下。

1.5w字,30圖帶你徹底掌握AQS!

可以看到 synchronized 鎖也是基于管程實現(xiàn)的,只不過它只有且只有一個條件變量(就是鎖對象本身)而已,這也是為什么JDK 要實現(xiàn) Lock 鎖的原因之一,Lock 支持多個條件變量。

通過這樣的類比,相信大家對管程的工作機制有了比較清晰的認識,為啥要花這么大的力氣介紹管程呢,一來管程是解決并發(fā)問題的萬能鑰匙,二來?AQS 是基于 Java 并發(fā)包中管程的一種實現(xiàn),所以理解管程對我們理解 AQS 會大有幫助,接下來我們就來看看 AQS 是如何工作的。

AQS 實現(xiàn)原理

AQS 全稱是 AbstractQueuedSynchronizer,是一個用來構建同步器的框架,它維護了一個共享資源 state 和一個 FIFO 的等待隊列(即上文中管程的入口等待隊列),底層利用了 CAS 機制來保證操作的原子性。

AQS 實現(xiàn)鎖的主要原理如下:

1.5w字,30圖帶你徹底掌握AQS!

以實現(xiàn)獨占鎖為例(即當前資源只能被一個線程占有),其實現(xiàn)原理如下:state 初始化 0,在多線程條件下,線程要執(zhí)行臨界區(qū)的代碼,必須首先獲取 state,某個線程獲取成功之后, state 加 1,其他線程再獲取的話由于共享資源已被占用,所以會到 FIFO 等待隊列去等待,等占有 state 的線程執(zhí)行完臨界區(qū)的代碼釋放資源( state 減 1)后,會喚醒 FIFO 中的下一個等待線程(head 中的下一個結點)去獲取 state。

state 由于是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時雖然 volatile 能保證可見性,但不能保證原子性,所以 AQS 提供了對 state 的原子操作方法,保證了線程安全。

另外 AQS 中實現(xiàn)的 FIFO 隊列(CLH 隊列)其實是雙向鏈表實現(xiàn)的,由 head, tail 節(jié)點表示,head 結點代表當前占用的線程,其他節(jié)點由于暫時獲取不到鎖所以依次排隊等待鎖釋放。

所以我們不難明白 AQS 的如下定義:

public?abstract?class?AbstractQueuedSynchronizer
??extends?AbstractOwnableSynchronizer
????implements?java.io.Serializable?
{
????//?以下為雙向鏈表的首尾結點,代表入口等待隊列
????private?transient?volatile?Node?head;
????private?transient?volatile?Node?tail;
????//?共享變量?state
????private?volatile?int?state;
????//?cas?獲取?/?釋放?state,保證線程安全地獲取鎖
????protected?final?boolean?compareAndSetState(int?expect,?int?update)?{
????????//?See?below?for?intrinsics?setup?to?support?this
????????return?unsafe.compareAndSwapInt(this,?stateOffset,?expect,?update);
????}
????//?...
?}

AQS 源碼剖析

ReentrantLock 是我們比較常用的一種鎖,也是基于 AQS 實現(xiàn)的,所以接下來我們就來分析一下 ReentrantLock 鎖的實現(xiàn)來一探 AQS 究竟。本文將會采用圖文并茂的方式讓大家理解 AQS 的實現(xiàn)原理,大家在學習過程中,可以多類比一下上文中就診的例子,相信會有助于理解。

首先我們要知道 ReentrantLock 是獨占鎖,也有公平和非公平兩種鎖模式,什么是獨占與有共享模式,什么又是公平鎖與非公平鎖

與獨占鎖對應的是共享鎖,這兩者有什么區(qū)別呢

獨占鎖:即其他線程只有在占有鎖的線程釋放后才能競爭鎖,有且只有一個線程能競爭成功(醫(yī)生只有一個,一次只能看一個病人)

共享鎖:即共享資源可以被多個線程同時占有,直到共享資源被占用完畢(多個醫(yī)生,可以看多個病人),常見的有讀寫鎖 ReadWriteLock, CountdownLatch,兩者的區(qū)別如下

1.5w字,30圖帶你徹底掌握AQS!

什么是公平鎖與非公平鎖

還是以就醫(yī)為例,所謂公平鎖即大家取號后老老實實按照先來后到的順序在侯診室依次等待叫號,如果是非公平鎖呢,新來的病人(線程)很霸道,不取號排隊 ,直接去搶先看病,占有醫(yī)生(不一定成功)

1.5w字,30圖帶你徹底掌握AQS!
公平鎖與非公平鎖

本文我們將會重點分析獨占,非公平模式的源碼實現(xiàn),不分析共享模式與 Condition 的實現(xiàn),因為剖析了獨占鎖的實現(xiàn),由于原理都是相似的,再分析共享與 Condition 就不難了。

首先我們先來看下 ReentrantLock 的使用方法

//?1.?初始化可重入鎖
private?ReentrantLock?lock?=?new?ReentrantLock();
public?void?run()?{
????//?加鎖
????lock.lock();
????try?{
????????//?2.?執(zhí)行臨界區(qū)代碼
????}?catch?(InterruptedException?e)?{
????????e.printStackTrace();
????}?finally?{
????????//?3.?解鎖
????????lock.unlock();
????}
}

第一步是初始化可重入鎖,可以看到默認使用的是非公平鎖機制

public?ReentrantLock()?{
????sync?=?new?NonfairSync();
}

當然你也可以用如下構造方法來指定使用公平鎖:

public?ReentrantLock(boolean?fair)?{
????sync?=?fair???new?FairSync()?:?new?NonfairSync();
}

畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實現(xiàn)的內(nèi)部類,分別指公平和非公平模式,ReentrantLock ReentrantLock 的加鎖(lock),解鎖(unlock)在內(nèi)部具體都是調(diào)用的 FairSync,NonfairSync 的加鎖和解鎖方法。

幾個類的關系如下:1.5w字,30圖帶你徹底掌握AQS!

我們先來剖析下非公平鎖(NonfairSync)的實現(xiàn)方式,來看上述示例代碼的第二步:加鎖,由于默認的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的

1.5w字,30圖帶你徹底掌握AQS!

可以看到 lock 方法主要有兩步

  1. 使用 CAS 來獲取 state 資源,如果成功設置 1,代表 state 資源獲取鎖成功,此時記錄下當前占用 state 的線程? setExclusiveOwnerThread(Thread.currentThread());
  2. 如果 CAS 設置 state 為 1 失敗(代表獲取鎖失?。?,則執(zhí)行 acquire(1) 方法,這個方法是 AQS 提供的方法,如下
public?final?void?acquire(int?arg)?{
????if?(!tryAcquire(arg)?&&
????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????selfInterrupt();
}

tryAcquire 剖析

首先 調(diào)用 tryAcquire 嘗試著獲取 state,如果成功,則跳過后面的步驟。如果失敗,則執(zhí)行 acquireQueued 將線程加入 CLH 等待隊列中。

先來看下 tryAcquire 方法,這個方法是 AQS 提供的一個模板方法,最終由其 AQS 具體的實現(xiàn)類(Sync)實現(xiàn),由于執(zhí)行的是非公平鎖邏輯,執(zhí)行的代碼如下:

final?boolean?nonfairTryAcquire(int?acquires)?{
????final?Thread?current?=?Thread.currentThread();
????int?c?=?getState();

????if?(c?==?0)?{
????????//?如果?c?等于0,表示此時資源是空閑的(即鎖是釋放的),再用?CAS?獲取鎖
????????if?(compareAndSetState(0,?acquires))?{
????????????setExclusiveOwnerThread(current);
????????????return?true;
????????}
????}
????else?if?(current?==?getExclusiveOwnerThread())?{
????????//?此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數(shù)再加?1,這也映證了?ReentrantLock?為可重入鎖
????????int?nextc?=?c?+?acquires;
????????if?(nextc?0)?//?overflow
????????????throw?new?Error("Maximum?lock?count?exceeded");
????????setState(nextc);
????????return?true;
????}
????return?false;
}

此段代碼可知鎖的獲取主要分兩種情況

  1. state 為 0 時,代表鎖已經(jīng)被釋放,可以去獲取,于是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競爭鎖成功,使用 setExclusiveOwnerThread(current) 記錄下此時占有鎖的線程,看到這里的 CAS,大家應該不難理解為啥當前實現(xiàn)是非公平鎖了,因為隊列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊
  2. 如果 state 不為 0,代表之前已有線程占有了鎖,如果此時的線程依然是之前占有鎖的線程(current == getExclusiveOwnerThread() 為 true),代表此線程再一次占有了鎖(可重入鎖),此時更新 state,記錄下鎖被占有的次數(shù)(鎖的重入次數(shù)),這里的 setState 方法不需要使用 CAS 更新,因為此時的鎖就是當前線程占有的,其他線程沒有機會進入這段代碼執(zhí)行。所以此時更新 state 是線程安全的。

假設當前 state = 0,即鎖不被占用,現(xiàn)在有 T1, T2, T3 這三個線程要去競爭鎖1.5w字,30圖帶你徹底掌握AQS!

假設現(xiàn)在 T1 獲取鎖成功,則兩種情況分別為 1、 T1 首次獲取鎖成功1.5w字,30圖帶你徹底掌握AQS!

2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當前如果 T1一直申請占用鎖成功,state 會一直累加

1.5w字,30圖帶你徹底掌握AQS!

acquireQueued 剖析

如果 tryAcquire(arg) 執(zhí)行失敗,代表獲取鎖失敗,則執(zhí)行 acquireQueued 方法,將線程加入 FIFO 等待隊列

public?final?void?acquire(int?arg)?{
????if?(!tryAcquire(arg)?&&
????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????selfInterrupt();
}

所以接下來我們來看看 acquireQueued 的執(zhí)行邏輯,首先會調(diào)用?addWaiter(Node.EXCLUSIVE)??將包含有當前線程的 Node 節(jié)點入隊, Node.EXCLUSIVE 代表此結點為獨占模式

再來看下 addWaiter 是如何實現(xiàn)的

private?Node?addWaiter(Node?mode)?{
????Node?node?=?new?Node(Thread.currentThread(),?mode);
????Node?pred?=?tail;
????//?如果尾結點不為空,則用?CAS?將獲取鎖失敗的線程入隊
????if?(pred?!=?null)?{
????????node.prev?=?pred;
????????if?(compareAndSetTail(pred,?node))?{
????????????pred.next?=?node;
????????????return?node;
????????}
????}
????//?如果結點為空,執(zhí)行?enq?方法
????enq(node);
????return?node;
}

這段邏輯比較清楚,首先是獲取 FIFO 隊列的尾結點,如果尾結點存在,則采用 CAS 的方式將等待線程入隊,如果尾結點為空則執(zhí)行 enq 方法

private?Node?enq(final?Node?node)?{
????for?(;;)?{
????????Node?t?=?tail;
????????if?(t?==?null)?{
????????????//?尾結點為空,說明?FIFO?隊列未初始化,所以先初始化其頭結點
????????????if?(compareAndSetHead(new?Node()))
????????????????tail?=?head;
????????}?else?{
????????????//?尾結點不為空,則將等待線程入隊
????????????node.prev?=?t;
????????????if?(compareAndSetTail(t,?node))?{
????????????????t.next?=?node;
????????????????return?t;
????????????}
????????}
????}
}

首先判斷 tail 是否為空,如果為空說明 FIFO 隊列的 head,tail 還未構建,此時先構建頭結點,構建之后再用 CAS 的方式將此線程結點入隊

使用 CAS 創(chuàng)建 head 節(jié)點的時候只是簡單調(diào)用了 new Node() 方法,并不像其他節(jié)點那樣記錄 thread,這是為啥

因為 head 結點為虛結點,它只代表當前有線程占用了 state,至于占用 state 的是哪個線程,其實是調(diào)用了上文的 setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性里。

執(zhí)行完 addWaiter 后,線程入隊成功,現(xiàn)在就要看最后一個最關鍵的方法 acquireQueued 了,這個方法有點難以理解,先不急,我們先用三個線程來模擬一下之前的代碼對應的步驟

1、假設 T1 獲取鎖成功,由于此時 FIFO 未初始化,所以先創(chuàng)建 head 結點

1.5w字,30圖帶你徹底掌握AQS!

2、此時 T2 或 T3 再去競爭 state 失敗,入隊,如下圖:

1.5w字,30圖帶你徹底掌握AQS!

好了,現(xiàn)在問題來了, T2,T3 入隊后怎么處理,是馬上阻塞嗎,馬上阻塞意味著線程從運行態(tài)轉為阻塞態(tài) ,涉及到用戶態(tài)向內(nèi)核態(tài)的切換,而且喚醒后也要從內(nèi)核態(tài)轉為用戶態(tài),開銷相對比較大,所以 AQS 對這種入隊的線程采用的方式是讓它們自旋來競爭鎖,如下圖示

1.5w字,30圖帶你徹底掌握AQS!

不過聰明的你可能發(fā)現(xiàn)了一個問題,如果當前鎖是獨占鎖,如果鎖一直被被 T1 占有, T2,T3 一直自旋沒太大意義,反而會占用 CPU,影響性能,所以更合適的方式是它們自旋一兩次競爭不到鎖后識趣地阻塞以等待前置節(jié)點釋放鎖后再來喚醒它。

另外如果鎖在自旋過程中被中斷了,或者自旋超時了,應該處于「取消」狀態(tài)。

基于每個 Node 可能所處的狀態(tài),AQS 為其定義了一個變量 waitStatus,根據(jù)這個變量值對相應節(jié)點進行相關的操作,我們一起來看這看這個變量都有哪些值,是時候看一個 Node 結點的屬性定義了

static?final?class?Node?{
????static?final?Node?SHARED?=?new?Node();//標識等待節(jié)點處于共享模式
????static?final?Node?EXCLUSIVE?=?null;//標識等待節(jié)點處于獨占模式

????static?final?int?CANCELLED?=?1;?//由于超時或中斷,節(jié)點已被取消
????static?final?int?SIGNAL?=?-1;??//?節(jié)點阻塞(park)必須在其前驅(qū)結點為?SIGNAL?的狀態(tài)下才能進行,如果結點為?SIGNAL,則其釋放鎖或取消后,可以通過?unpark?喚醒下一個節(jié)點,
????static?final?int?CONDITION?=?-2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊列,然后釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之后才能返回)
????static?final?int?PROPAGATE?=?-3;//表示后續(xù)結點會傳播喚醒的操作,共享模式下起作用

????//等待狀態(tài):對于condition節(jié)點,初始化為CONDITION;其它情況,默認為0,通過CAS操作原子更新
????volatile?int?waitStatus;

通過狀態(tài)的定義,我們可以猜測一下 AQS 對線程自旋的處理:如果當前節(jié)點的上一個節(jié)點不為 head,且它的狀態(tài)為 SIGNAL,則結點進入阻塞狀態(tài)。來看下代碼以映證我們的猜測:

final?boolean?acquireQueued(final?Node?node,?int?arg)?{
????boolean?failed?=?true;
????try?{
????????boolean?interrupted?=?false;
????????for?(;;)?{
????????????final?Node?p?=?node.predecessor();
????????????//?如果前一個節(jié)點是?head,則嘗試自旋獲取鎖
????????????if?(p?==?head?&&?tryAcquire(arg))?{
????????????????//??將?head?結點指向當前節(jié)點,原?head?結點出隊
????????????????setHead(node);
????????????????p.next?=?null;?//?help?GC
????????????????failed?=?false;
????????????????return?interrupted;
????????????}
????????????//?如果前一個節(jié)點不是?head?或者競爭鎖失敗,則進入阻塞狀態(tài)
????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
????????????????parkAndCheckInterrupt())
????????????????interrupted?=?true;
????????}
????}?finally?{
????????if?(failed)
????????????//?如果線程自旋中因為異常等原因獲取鎖最終失敗,則調(diào)用此方法
????????????cancelAcquire(node);
????}
}

先來看第一種情況,如果當前結點的前一個節(jié)點是 head 結點,且獲取鎖(tryAcquire)成功的處理

1.5w字,30圖帶你徹底掌握AQS!

可以看到主要的處理就是把 head 指向當前節(jié)點,并且讓原 head 結點出隊,這樣由于原 head 不可達, 會被垃圾回收。

注意其中 setHead 的處理

private?void?setHead(Node?node)?{
????head?=?node;
????node.thread?=?null;
????node.prev?=?null;
}

將 head 設置成當前結點后,要把節(jié)點的 thread, pre 設置成 null,因為之前分析過了,head 是虛節(jié)點,不保存除 waitStatus(結點狀態(tài))的其他信息,所以這里把 thread ,pre 置為空,因為占有鎖的線程由 exclusiveThread 記錄了,如果 head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時候要多操作一遍 head 的 thread 釋放。

如果前一個節(jié)點不是 head 或者競爭鎖失敗,則首先調(diào)用 ?shouldParkAfterFailedAcquire 方法判斷鎖是否應該停止自旋進入阻塞狀態(tài):

private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
????int?ws?=?pred.waitStatus;
????????
????if?(ws?==?Node.SIGNAL)
???????//?1.?如果前置頂點的狀態(tài)為?SIGNAL,表示當前節(jié)點可以阻塞了
????????return?true;
????if?(ws?>?0)?{
????????//?2.?移除取消狀態(tài)的結點
????????do?{
????????????node.prev?=?pred?=?pred.prev;
????????}?while?(pred.waitStatus?>?0);
????????pred.next?=?node;
????}?else?{
????????//?3.?如果前置節(jié)點的?ws?不為?0,則其設置為?SIGNAL,
????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
????}
????return?false;
}

這一段代碼有點繞,需要稍微動點腦子,按以上步驟一步步來看

1、 首先我們要明白,根據(jù)之前 Node 類的注釋,如果前驅(qū)節(jié)點為 SIGNAL,則當前節(jié)點可以進入阻塞狀態(tài)。

1.5w字,30圖帶你徹底掌握AQS!如圖示:T2,T3 的前驅(qū)節(jié)點的 waitStatus 都為 SIGNAL,所以 T2,T3 此時都可以阻塞。

2、如果前驅(qū)節(jié)點為取消狀態(tài),則前驅(qū)節(jié)點需要移除,這些采用了一個更巧妙的方法,把所有當前節(jié)點之前所有 waitStatus 為取消狀態(tài)的節(jié)點全部移除,假設有四個線程如下,T2,T3 為取消狀態(tài),則執(zhí)行邏輯后如下圖所示,T2, T3 節(jié)點會被 GC。

1.5w字,30圖帶你徹底掌握AQS!

3、如果前驅(qū)節(jié)點小于等于 0,則需要首先將其前驅(qū)節(jié)點置為 SIGNAL,因為前文我們分析過,當前節(jié)點進入阻塞的一個條件是前驅(qū)節(jié)點必須為 SIGNAL,這樣下一次自旋后發(fā)現(xiàn)前驅(qū)節(jié)點為 SIGNAL,就會返回 true(即步驟 1)

shouldParkAfterFailedAcquire 返回 true 代表線程可以進入阻塞中斷,那么下一步 parkAndCheckInterrupt 就該讓線程阻塞了

private?final?boolean?parkAndCheckInterrupt()?{
????//?阻塞線程
????LockSupport.park(this);
????//?返回線程是否中斷過,并且清除中斷狀態(tài)(在獲得鎖后會補一次中斷)
????return?Thread.interrupted();
}

這里的阻塞線程很容易理解,但為啥要判斷線程是否中斷過呢,因為如果線程在阻塞期間收到了中斷,喚醒(轉為運行態(tài))獲取鎖后(acquireQueued 為 true)需要補一個中斷,如下所示:

public?final?void?acquire(int?arg)?{
????if?(!tryAcquire(arg)?&&
????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????//?如果是因為中斷喚醒的線程,獲取鎖后需要補一下中斷
????????selfInterrupt();
}

至此,獲取鎖的流程已經(jīng)分析完畢,不過還有一個疑惑我們還沒解開:前文不是說 Node 狀態(tài)為取消狀態(tài)會被取消嗎,那 Node 什么時候會被設置為取消狀態(tài)呢。

回頭看 acquireQueued

final?boolean?acquireQueued(final?Node?node,?int?arg)?{
????boolean?failed?=?true;
????try?{
????????//?省略自旋獲取鎖代碼????????
????}?finally?{
????????if?(failed)
????????????//?如果線程自旋中因為異常等原因獲取鎖最終失敗,則調(diào)用此方法
????????????cancelAcquire(node);
????}
}

看最后一個 cancelAcquire 方法,如果線程自旋中因為異常等原因獲取鎖最終失敗,則會調(diào)用此方法

private?void?cancelAcquire(Node?node)?{
????//?如果節(jié)點為空,直接返回
????if?(node?==?null)
????????return;
????//?由于線程要被取消了,所以將?thread?線程清掉
????node.thread?=?null;

????//?下面這步表示將?node?的?pre?指向之前第一個非取消狀態(tài)的結點(即跳過所有取消狀態(tài)的結點),waitStatus?>?0?表示當前結點狀態(tài)為取消狀態(tài)
????Node?pred?=?node.prev;
????while?(pred.waitStatus?>?0)
????????node.prev?=?pred?=?pred.prev;

????//?獲取經(jīng)過過濾后的?pre?的?next?結點,這一步主要用在后面的?CAS?設置?pre?的?next?節(jié)點上
????Node?predNext?=?pred.next;
????//?將當前結點設置為取消狀態(tài)
????node.waitStatus?=?Node.CANCELLED;

????//?如果當前取消結點為尾結點,使用?CAS?則將尾結點設置為其前驅(qū)節(jié)點,如果設置成功,則尾結點的?next?指針設置為空
????if?(node?==?tail?&&?compareAndSetTail(node,?pred))?{
????????compareAndSetNext(pred,?predNext,?null);
????}?else?{
????//?這一步看得有點繞,我們想想,如果當前節(jié)點取消了,那是不是要把當前節(jié)點的前驅(qū)節(jié)點指向當前節(jié)點的后繼節(jié)點,但是我們之前也說了,要喚醒或阻塞結點,須在其前驅(qū)節(jié)點的狀態(tài)為 SIGNAL 的條件才能操作,所以在設置 pre 的 next 節(jié)點時要保證 pre 結點的狀態(tài)為 SIGNAL,想通了這一點相信你不難理解以下代碼。
????????int?ws;
????????if?(pred?!=?head?&&
????????????((ws?=?pred.waitStatus)?==?Node.SIGNAL?||
?????????????(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&
????????????pred.thread?!=?null)?{
????????????Node?next?=?node.next;
????????????if?(next?!=?null?&&?next.waitStatus?<=?0)
????????????????compareAndSetNext(pred,?predNext,?next);
????????}?else?{
????????//?如果?pre?為?head,或者??pre?的狀態(tài)設置?SIGNAL?失敗,則直接喚醒后繼結點去競爭鎖,之前我們說過,?SIGNAL?的結點取消(或釋放鎖)后可以喚醒后繼結點
????????????unparkSuccessor(node);
????????}
????????node.next?=?node;?//?help?GC
????}
}

這一段代碼有點繞,我們一個個來看,首先考慮以下情況

1、首先第一步當前節(jié)點之前有取消結點時,則邏輯如下1.5w字,30圖帶你徹底掌握AQS!

2、如果當前結點既非頭結點的后繼結點,也非尾結點,即步驟 1 所示,則最終結果如下1.5w字,30圖帶你徹底掌握AQS!

這里肯定有人問,這種情況下當前節(jié)點與它的前驅(qū)結點無法被 GC 啊,還記得我們上文分析鎖自旋時的處理嗎,再看下以下代碼

private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
????int?ws?=?pred.waitStatus;
????//?省略無關代碼
????if?(ws?>?0)?{
????????//?移除取消狀態(tài)的結點
????????do?{
????????????node.prev?=?pred?=?pred.prev;
????????}?while?(pred.waitStatus?>?0);
????????pred.next?=?node;
????}?
????return?false;
}

這段代碼會將 node 的 pre 指向之前 waitStatus 為非 CANCEL 的節(jié)點

所以當 T4 執(zhí)行這段代碼時,會變成如下情況

1.5w字,30圖帶你徹底掌握AQS!可以看到此時中間的兩個 CANCEL 節(jié)點不可達了,會被 GC

3、如果當前結點為 tail 結點,則結果如下,這種情況下當前結點不可達,會被 GC1.5w字,30圖帶你徹底掌握AQS!

4、如果當前結點為 head 的后繼結點時,如下

1.5w字,30圖帶你徹底掌握AQS!

結果中的 CANCEL 結點同樣會在 tail 結點自旋調(diào)用 shouldParkAfterFailedAcquire 后不可達,如下

1.5w字,30圖帶你徹底掌握AQS!

自此我們終于分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。

鎖釋放

不管是公平鎖還是非公平鎖,最終都是調(diào)的 AQS 的如下模板方法來釋放鎖

//?java.util.concurrent.locks.AbstractQueuedSynchronizer

public?final?boolean?release(int?arg)?{
????//?鎖釋放是否成功
????if?(tryRelease(arg))?{
????????Node?h?=?head;
????????if?(h?!=?null?&&?h.waitStatus?!=?0)
????????????unparkSuccessor(h);
????????return?true;
????}
????return?false;
}

tryRelease 方法定義在了 AQS 的子類 Sync 方法里

//?java.util.concurrent.locks.ReentrantLock.Sync

protected?final?boolean?tryRelease(int?releases)?{
????int?c?=?getState()?-?releases;
????//?只有持有鎖的線程才能釋放鎖,所以如果當前鎖不是持有鎖的線程,則拋異常
????if?(Thread.currentThread()?!=?getExclusiveOwnerThread())
????????throw?new?IllegalMonitorStateException();
????boolean?free?=?false;
????//?說明線程持有的鎖全部釋放了,需要釋放?exclusiveOwnerThread?的持有線程
????if?(c?==?0)?{
????????free?=?true;
????????setExclusiveOwnerThread(null);
????}
????setState(c);
????return?free;
}

鎖釋放成功后該干嘛,顯然是喚醒之后 head 之后節(jié)點,讓它來競爭鎖

//?java.util.concurrent.locks.AbstractQueuedSynchronizer

public?final?boolean?release(int?arg)?{
????//?鎖釋放是否成功
????if?(tryRelease(arg))?{
????????Node?h?=?head;
????????if?(h?!=?null?&&?h.waitStatus?!=?0)
????????????//?鎖釋放成功后,喚醒?head?之后的節(jié)點,讓它來競爭鎖
????????????unparkSuccessor(h);
????????return?true;
????}
????return?false;
}

這里釋放鎖的條件為啥是 h != null && h.waitStatus != 0 呢。

  1. 如果 h == null, 這有兩種可能,一種是一個線程在競爭鎖,現(xiàn)在它釋放了,當然沒有所謂的喚醒后繼節(jié)點,一種是其他線程正在運行競爭鎖,只是還未初始化頭節(jié)點,既然其他線程正在運行,也就無需執(zhí)行喚醒操作
  2. 如果 h != null 且 h.waitStatus == 0,說明 head 的后繼節(jié)點正在自旋競爭鎖,也就是說線程是運行狀態(tài)的,無需喚醒。
  3. 如果 h != null 且 h.waitStatus < 0, 此時 waitStatus 值可能為 SIGNAL,或 PROPAGATE,這兩種情況說明后繼結點阻塞需要被喚醒

來看一下喚醒方法 unparkSuccessor:

private?void?unparkSuccessor(Node?node)?{
????//?獲取?head?的?waitStatus(假設其為?SIGNAL),并用?CAS?將其置為?0,為啥要做這一步呢,之前我們分析過多次,其實?waitStatus?=?SIGNAL(
????int?ws?=?node.waitStatus;
????if?(ws?0
)
????????compareAndSetWaitStatus(node,?ws,?0);

????//?以下操作為獲取隊列第一個非取消狀態(tài)的結點,并將其喚醒
????Node?s?=?node.next;
????//?s?狀態(tài)為非空,或者其為取消狀態(tài),說明?s?是無效節(jié)點,此時需要執(zhí)行?if?里的邏輯
????if?(s?==?null?||?s.waitStatus?>?0)?{
????????s?=?null;
????????//?以下操作為從尾向前獲取最后一個非取消狀態(tài)的結點
????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
????????????if?(t.waitStatus?<=?0)
????????????????s?=?t;
????}
????if?(s?!=?null)
????????LockSupport.unpark(s.thread);
}

這里的尋找隊列的第一個非取消狀態(tài)的節(jié)點為啥要從后往前找呢,因為節(jié)點入隊并不是原子操作,如下1.5w字,30圖帶你徹底掌握AQS!

線程自旋時時是先執(zhí)行 node.pre = pred, 然后再執(zhí)行 pred.next = node,如果 unparkSuccessor 剛好在這兩者之間執(zhí)行,此時是找不到 ?head 的后繼節(jié)點的,如下

1.5w字,30圖帶你徹底掌握AQS!

如何利用 AQS 自定義一個互斥鎖

AQS 通過提供 state 及 FIFO 隊列的管理,為我們提供了一套通用的實現(xiàn)鎖的底層方法,基本上定義一個鎖,都是轉為在其內(nèi)部定義 AQS 的子類,調(diào)用 AQS 的底層方法來實現(xiàn)的,由于 AQS 在底層已經(jīng)為了定義好了這些獲取 state 及 FIFO 隊列的管理工作,我們要實現(xiàn)一個鎖就比較簡單了,我們可以基于 AQS 來實現(xiàn)一個非可重入的互斥鎖,如下所示

public?class?Mutex??{

????private?Sync?sync?=?new?Sync();
????
????public?void?lock?()?{
????????sync.acquire(1);
????}
????
????public?void?unlock?()?{
????????sync.release(1);
????}

????private?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????@Override
????????protected?boolean?tryAcquire?(int?arg)?{
????????????return?compareAndSetState(0,?1);
????????}

????????@Override
????????protected?boolean?tryRelease?(int?arg)?{
????????????setState(0);
????????????return?true;
????????}

????????@Override
????????protected?boolean?isHeldExclusively?()?{
????????????return?getState()?==?1;
????????}
????}
}

可以看到區(qū)區(qū)幾行代碼就實現(xiàn)了,確實很方便。

巨人的肩膀

  • 【操作系統(tǒng)】生產(chǎn)者消費者問題 https://blog.csdn.net/liushall/article/details/81569609
  • https://time.geekbang.org/column/article/86089
  • https://www.cnblogs.com/binarylei/p/12544002.html
  • https://zhuanlan.zhihu.com/p/150573153
  • https://blog.csdn.net/anlian523/article/details/106448512
  • https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

特別推薦一個分享架構+算法的優(yōu)質(zhì)內(nèi)容,還沒關注的小伙伴,可以長按關注一下:

1.5w字,30圖帶你徹底掌握AQS!

1.5w字,30圖帶你徹底掌握AQS!

1.5w字,30圖帶你徹底掌握AQS!

長按訂閱更多精彩▼

1.5w字,30圖帶你徹底掌握AQS!

如有收獲,點個在看,誠摯感謝

免責聲明:本文內(nèi)容由21ic獲得授權后發(fā)布,版權歸原作者所有,本平臺僅提供信息存儲服務。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關機構授權發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

LED驅(qū)動電源的輸入包括高壓工頻交流(即市電)、低壓直流、高壓直流、低壓高頻交流(如電子變壓器的輸出)等。

關鍵字: 驅(qū)動電源

在工業(yè)自動化蓬勃發(fā)展的當下,工業(yè)電機作為核心動力設備,其驅(qū)動電源的性能直接關系到整個系統(tǒng)的穩(wěn)定性和可靠性。其中,反電動勢抑制與過流保護是驅(qū)動電源設計中至關重要的兩個環(huán)節(jié),集成化方案的設計成為提升電機驅(qū)動性能的關鍵。

關鍵字: 工業(yè)電機 驅(qū)動電源

LED 驅(qū)動電源作為 LED 照明系統(tǒng)的 “心臟”,其穩(wěn)定性直接決定了整個照明設備的使用壽命。然而,在實際應用中,LED 驅(qū)動電源易損壞的問題卻十分常見,不僅增加了維護成本,還影響了用戶體驗。要解決這一問題,需從設計、生...

關鍵字: 驅(qū)動電源 照明系統(tǒng) 散熱

根據(jù)LED驅(qū)動電源的公式,電感內(nèi)電流波動大小和電感值成反比,輸出紋波和輸出電容值成反比。所以加大電感值和輸出電容值可以減小紋波。

關鍵字: LED 設計 驅(qū)動電源

電動汽車(EV)作為新能源汽車的重要代表,正逐漸成為全球汽車產(chǎn)業(yè)的重要發(fā)展方向。電動汽車的核心技術之一是電機驅(qū)動控制系統(tǒng),而絕緣柵雙極型晶體管(IGBT)作為電機驅(qū)動系統(tǒng)中的關鍵元件,其性能直接影響到電動汽車的動力性能和...

關鍵字: 電動汽車 新能源 驅(qū)動電源

在現(xiàn)代城市建設中,街道及停車場照明作為基礎設施的重要組成部分,其質(zhì)量和效率直接關系到城市的公共安全、居民生活質(zhì)量和能源利用效率。隨著科技的進步,高亮度白光發(fā)光二極管(LED)因其獨特的優(yōu)勢逐漸取代傳統(tǒng)光源,成為大功率區(qū)域...

關鍵字: 發(fā)光二極管 驅(qū)動電源 LED

LED通用照明設計工程師會遇到許多挑戰(zhàn),如功率密度、功率因數(shù)校正(PFC)、空間受限和可靠性等。

關鍵字: LED 驅(qū)動電源 功率因數(shù)校正

在LED照明技術日益普及的今天,LED驅(qū)動電源的電磁干擾(EMI)問題成為了一個不可忽視的挑戰(zhàn)。電磁干擾不僅會影響LED燈具的正常工作,還可能對周圍電子設備造成不利影響,甚至引發(fā)系統(tǒng)故障。因此,采取有效的硬件措施來解決L...

關鍵字: LED照明技術 電磁干擾 驅(qū)動電源

開關電源具有效率高的特性,而且開關電源的變壓器體積比串聯(lián)穩(wěn)壓型電源的要小得多,電源電路比較整潔,整機重量也有所下降,所以,現(xiàn)在的LED驅(qū)動電源

關鍵字: LED 驅(qū)動電源 開關電源

LED驅(qū)動電源是把電源供應轉換為特定的電壓電流以驅(qū)動LED發(fā)光的電壓轉換器,通常情況下:LED驅(qū)動電源的輸入包括高壓工頻交流(即市電)、低壓直流、高壓直流、低壓高頻交流(如電子變壓器的輸出)等。

關鍵字: LED 隧道燈 驅(qū)動電源
關閉