十年網(wǎng)站開發(fā)經(jīng)驗 + 多家企業(yè)客戶 + 靠譜的建站團隊
量身定制 + 運營維護+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
這篇文章給大家介紹java同步器AQS的實現(xiàn)原理是什么,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
創(chuàng)新互聯(lián)建站主營建安網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,重慶APP軟件開發(fā),建安h5成都小程序開發(fā)搭建,建安網(wǎng)站營銷推廣歡迎建安等地區(qū)企業(yè)咨詢
在java.util.concurrent.locks包中有很多Lock的實現(xiàn)類,常用的有ReentrantLock、ReadWriteLock(實現(xiàn)類ReentrantReadWriteLock),內(nèi)部實現(xiàn)都依賴AbstractQueuedSynchronizer類,接下去讓我們看看Doug Lea大神是如何使用一個普通類就完成了代碼塊的并發(fā)訪問控制。為了方便,本文中使用AQS代替AbstractQueuedSynchronizer。
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {
//等待隊列的頭節(jié)點
private transient volatile Node head; //等待隊列的尾節(jié)點
private transient volatile Node tail; //同步狀態(tài)
private volatile int state;
protected final int getState() { return state;}
protected final void setState(int newState) { state = newState;}
...
}隊列同步器AQS是用來構(gòu)建鎖或其他同步組件的基礎(chǔ)框架,內(nèi)部使用一個int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作,其中內(nèi)部狀態(tài)state,等待隊列的頭節(jié)點head和尾節(jié)點head,都是通過volatile修飾,保證了多線程之間的可見。
在深入實現(xiàn)原理之前,我們先看看內(nèi)部的FIFO隊列是如何實現(xiàn)的。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
...
}先來一張形象的圖(該圖其實是網(wǎng)上找的)

黃色節(jié)點是默認head節(jié)點,其實是一個空節(jié)點,我覺得可以理解成代表當(dāng)前持有鎖的線程,每當(dāng)有線程競爭失敗,都是插入到隊列的尾節(jié)點,tail節(jié)點始終指向隊列中的最后一個元素。
每個節(jié)點中, 除了存儲了當(dāng)前線程,前后節(jié)點的引用以外,還有一個waitStatus變量,用于描述節(jié)點當(dāng)前的狀態(tài)。多線程并發(fā)執(zhí)行時,隊列中會有多個節(jié)點存在,這個waitStatus其實代表對應(yīng)線程的狀態(tài):有的線程可能獲取鎖因為某些原因放棄競爭;有的線程在等待滿足條件,滿足之后才能執(zhí)行等等。一共有4中狀態(tài):
CANCELLED 取消狀態(tài)
SIGNAL 等待觸發(fā)狀態(tài)
CONDITION 等待條件狀態(tài)
PROPAGATE 狀態(tài)需要向后傳播
等待隊列是FIFO先進先出,只有前一個節(jié)點的狀態(tài)為SIGNAL時,當(dāng)前節(jié)點的線程才能被掛起。
子類重寫tryAcquire和tryRelease方法通過CAS指令修改狀態(tài)變量state。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}下列步驟中線程A和B進行競爭。
線程A執(zhí)行CAS執(zhí)行成功,state值被修改并返回true,線程A繼續(xù)執(zhí)行。
線程A執(zhí)行CAS指令失敗,說明線程B也在執(zhí)行CAS指令且成功,這種情況下線程A會執(zhí)行步驟3。
生成新Node節(jié)點node,并通過CAS指令插入到等待隊列的隊尾(同一時刻可能會有多個Node節(jié)點插入到等待隊列中),如果tail節(jié)點為空,則將head節(jié)點指向一個空節(jié)點(代表線程B),具體實現(xiàn)如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}node插入到隊尾后,該線程不會立馬掛起,會進行自旋操作。因為在node的插入過程,線程B(即之前沒有阻塞的線程)可能已經(jīng)執(zhí)行完成,所以要判斷該node的前一個節(jié)點pred是否為head節(jié)點(代表線程B),如果pred == head,表明當(dāng)前節(jié)點是隊列中第一個“有效的”節(jié)點,因此再次嘗試tryAcquire獲取鎖,
1、如果成功獲取到鎖,表明線程B已經(jīng)執(zhí)行完成,線程A不需要掛起。
2、如果獲取失敗,表示線程B還未完成,至少還未修改state值。進行步驟5。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}前面我們已經(jīng)說過只有前一個節(jié)點pred的線程狀態(tài)為SIGNAL時,當(dāng)前節(jié)點的線程才能被掛起。
1、如果pred的waitStatus == 0,則通過CAS指令修改waitStatus為Node.SIGNAL。
2、如果pred的waitStatus > 0,表明pred的線程狀態(tài)CANCELLED,需從隊列中刪除。
3、如果pred的waitStatus為Node.SIGNAL,則通過LockSupport.park()方法把線程A掛起,并等待被喚醒,被喚醒后進入步驟6。
具體實現(xiàn)如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true; if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}線程每次被喚醒時,都要進行中斷檢測,如果發(fā)現(xiàn)當(dāng)前線程被中斷,那么拋出InterruptedException并退出循環(huán)。從無限循環(huán)的代碼可以看出,并不是被喚醒的線程一定能獲得鎖,必須調(diào)用tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的線程獲得,從而導(dǎo)致剛被喚醒的線程再次被阻塞,這個細節(jié)充分體現(xiàn)了“非公平”的精髓。
如果頭結(jié)點head的waitStatus值為-1,則用CAS指令重置為0;
找到waitStatus值小于0的節(jié)點s,通過LockSupport.unpark(s.thread)喚醒線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}關(guān)于java同步器AQS的實現(xiàn)原理是什么就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。