十年網(wǎng)站開發(fā)經(jīng)驗 + 多家企業(yè)客戶 + 靠譜的建站團隊
量身定制 + 運營維護+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
(1)java8中為什么要新增LongAdder?

(2)LongAdder的實現(xiàn)方式?
(3)LongAdder與AtomicLong的對比?
LongAdder是java8中新增的原子類,在多線程環(huán)境中,它比AtomicLong性能要高出不少,特別是寫多的場景。
它是怎么實現(xiàn)的呢?讓我們一起來學習吧。
LongAdder的原理是,在最初無競爭時,只更新base的值,當有多線程競爭時通過分段的思想,讓不同的線程更新不同的段,最后把這些段相加就得到了完整的LongAdder存儲的值。

LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內(nèi)部類和各重要屬性。
// Striped64中的內(nèi)部類,使用@sun.misc.Contended注解,說明里面的值消除偽共享
@sun.misc.Contended static final class Cell {
// 存儲元素的值,使用volatile修飾保證可見性
volatile long value;
Cell(long x) { value = x; }
// CAS更新value的值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe實例
private static final sun.misc.Unsafe UNSAFE;
// value字段的偏移量
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}Cell類使用@sun.misc.Contended注解,說明是要避免偽共享的。
使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性。
關于Unsafe的介紹請查看【死磕 java魔法類之Unsafe解析】。
關于偽共享的介紹請查看【雜談 什么是偽共享(false sharing)?】。
// 這三個屬性都在Striped64中
// cells數(shù)組,存儲各個段的值
transient volatile Cell[] cells;
// 最初無競爭時使用的,也算一個特殊的段
transient volatile long base;
// 標記當前是否有線程在創(chuàng)建或擴容cells,或者在創(chuàng)建Cell
// 通過CAS更新該值,相當于是一個鎖
transient volatile int cellsBusy;最初無競爭或有其它線程在創(chuàng)建cells數(shù)組時使用base更新值,有過競爭時使用cells更新值。
最初無競爭是指一開始沒有線程之間的競爭,但也有可能是多線程在操作,只是這些線程沒有同時去更新base的值。
有過競爭是指只要出現(xiàn)過競爭不管后面有沒有競爭都使用cells更新值,規(guī)則是不同的線程hash到不同的cell上去更新,減少競爭。
add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中存儲的值增加x,x可為正可為負。
public void add(long x) {
// as是Striped64中的cells屬性
// b是Striped64中的base屬性
// v是當前線程hash到的Cell中存儲的值
// m是cells的長度減1,hash時作為掩碼使用
// a是當前線程hash到的Cell
Cell[] as; long b, v; int m; Cell a;
// 條件1:cells不為空,說明出現(xiàn)過競爭,cells已經(jīng)創(chuàng)建
// 條件2:cas操作base失敗,說明其它線程先一步修改了base,正在出現(xiàn)競爭
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true表示當前競爭還不激烈
// false表示競爭激烈,多個線程hash到同一個Cell,可能要擴容
boolean uncontended = true;
// 條件1:cells為空,說明正在出現(xiàn)競爭,上面是從條件2過來的
// 條件2:應該不會出現(xiàn)
// 條件3:當前線程所在的Cell為空,說明當前線程還沒有更新過Cell,應初始化一個Cell
// 條件4:更新當前線程所在的Cell失敗,說明現(xiàn)在競爭很激烈,多個線程hash到了同一個Cell,應擴容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是線程中的threadLocalRandomProbe字段
// 它是通過隨機數(shù)生成的一個值,對于一個確定的線程這個值是固定的
// 除非刻意修改它
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 調(diào)用Striped64中的方法處理
longAccumulate(x, null, uncontended);
}
}(1)最初無競爭時只更新base;
(2)直到更新base失敗時,創(chuàng)建cells數(shù)組;
(3)當多個線程競爭同一個Cell比較激烈時,可能要擴容;
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 存儲線程的probe值
int h;
// 如果getProbe()方法返回0,說明隨機數(shù)未初始化
if ((h = getProbe()) == 0) {
// 強制初始化
ThreadLocalRandom.current(); // force initialization
// 重新獲取probe值
h = getProbe();
// 都未初始化,肯定還不存在競爭激烈
wasUncontended = true;
}
// 是否發(fā)生碰撞
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// cells已經(jīng)初始化過
if ((as = cells) != null && (n = as.length) > 0) {
// 當前線程所在的Cell未初始化
if ((a = as[(n - 1) & h]) == null) {
// 當前無其它線程在創(chuàng)建或擴容cells,也沒有線程在創(chuàng)建Cell
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一個Cell,值為當前需要增加的值
Cell r = new Cell(x); // Optimistically create
// 再次檢測cellsBusy,并嘗試更新它為1
// 相當于當前線程加鎖
if (cellsBusy == 0 && casCellsBusy()) {
// 是否創(chuàng)建成功
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 重新獲取cells,并找到當前線程hash到cells數(shù)組中的位置
// 這里一定要重新獲取cells,因為as并不在鎖定范圍內(nèi)
// 有可能已經(jīng)擴容了,這里要重新獲取
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 把上面新建的Cell放在cells的j位置處
rs[j] = r;
// 創(chuàng)建成功
created = true;
}
} finally {
// 相當于釋放鎖
cellsBusy = 0;
}
// 創(chuàng)建成功了就返回
// 值已經(jīng)放在新建的Cell里面了
if (created)
break;
continue; // Slot is now non-empty
}
}
// 標記當前未出現(xiàn)沖突
collide = false;
}
// 當前線程所在的Cell不為空,且更新失敗了
// 這里簡單地設為true,相當于簡單地自旋一次
// 通過下面的語句修改線程的probe再重新嘗試
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 再次嘗試CAS更新當前線程所在Cell的值,如果成功了就返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果cells數(shù)組的長度達到了CPU核心數(shù),或者cells擴容了
// 設置collide為false并通過下面的語句修改線程的probe再重新嘗試
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 上上個elseif都更新失敗了,且上個條件不成立,說明出現(xiàn)沖突了
else if (!collide)
collide = true;
// 明確出現(xiàn)沖突了,嘗試占有鎖,并擴容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 檢查是否有其它線程已經(jīng)擴容過了
if (cells == as) { // Expand table unless stale
// 新數(shù)組為原數(shù)組的兩倍
Cell[] rs = new Cell[n << 1];
// 把舊數(shù)組元素拷貝到新數(shù)組中
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 重新賦值cells為新數(shù)組
cells = rs;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 已解決沖突
collide = false;
// 使用擴容后的新數(shù)組重新嘗試
continue; // Retry with expanded table
}
// 更新失敗或者達到了CPU核心數(shù),重新生成probe,并重試
h = advanceProbe(h);
}
// 未初始化過cells數(shù)組,嘗試占有鎖并初始化cells數(shù)組
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 是否初始化成功
boolean init = false;
try { // Initialize table
// 檢測是否有其它線程初始化過
if (cells == as) {
// 新建一個大小為2的Cell數(shù)組
Cell[] rs = new Cell[2];
// 找到當前線程hash到數(shù)組中的位置并創(chuàng)建其對應的Cell
rs[h & 1] = new Cell(x);
// 賦值給cells數(shù)組
cells = rs;
// 初始化成功
init = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
// 初始化成功直接返回
// 因為增加的值已經(jīng)同時創(chuàng)建到Cell中了
if (init)
break;
}
// 如果有其它線程在初始化cells數(shù)組中,就嘗試更新base
// 如果成功了就返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}(1)如果cells數(shù)組未初始化,當前線程會嘗試占有cellsBusy鎖并創(chuàng)建cells數(shù)組;
(2)如果當前線程嘗試創(chuàng)建cells數(shù)組時,發(fā)現(xiàn)有其它線程已經(jīng)在創(chuàng)建了,就嘗試更新base,如果成功就返回;
(3)通過線程的probe值找到當前線程應該更新cells數(shù)組中的哪個Cell;
(4)如果當前線程所在的Cell未初始化,就占有占有cellsBusy鎖并在相應的位置創(chuàng)建一個Cell;
(5)嘗試CAS更新當前線程所在的Cell,如果成功就返回,如果失敗說明出現(xiàn)沖突;
(5)當前線程更新Cell失敗后并不是立即擴容,而是嘗試更新probe值后再重試一次;
(6)如果在重試的時候還是更新失敗,就擴容;
(7)擴容時當前線程占有cellsBusy鎖,并把數(shù)組容量擴大到兩倍,再遷移原cells數(shù)組中元素到新數(shù)組中;
(8)cellsBusy在創(chuàng)建cells數(shù)組、創(chuàng)建Cell、擴容cells數(shù)組三個地方用到;
sum()方法是獲取LongAdder中真正存儲的值的大小,通過把base和所有段相加得到。
public long sum() {
Cell[] as = cells; Cell a;
// sum初始等于base
long sum = base;
// 如果cells不為空
if (as != null) {
// 遍歷所有的Cell
for (int i = 0; i < as.length; ++i) {
// 如果所在的Cell不為空,就把它的value累加到sum中
if ((a = as[i]) != null)
sum += a.value;
}
}
// 返回sum
return sum;
}可以看到sum()方法是把base和所有段的值相加得到,那么,這里有一個問題,如果前面已經(jīng)累加到sum上的Cell的value有修改,不是就沒法計算到了么?
答案確實如此,所以LongAdder可以說不是強一致性的,它是最終一致性的。
直接上代碼:
public class LongAdderVSAtomicLongTest {
public static void main(String[] args){
testAtomicLongVSLongAdder(1, 10000000);
testAtomicLongVSLongAdder(10, 10000000);
testAtomicLongVSLongAdder(20, 10000000);
testAtomicLongVSLongAdder(40, 10000000);
testAtomicLongVSLongAdder(80, 10000000);
}
static void testAtomicLongVSLongAdder(final int threadCount, final int times){
try {
System.out.println("threadCount:" + threadCount + ", times:" + times);
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
AtomicLong atomicLong = new AtomicLong();
List list = new ArrayList<>();
for (int i=0;i {
for (int j = 0; j list = new ArrayList<>();
for (int i=0;i {
for (int j = 0; j 運行結果如下:
threadCount:1, times:10000000
LongAdder elapse:158ms
AtomicLong elapse:64ms
threadCount:10, times:10000000
LongAdder elapse:206ms
AtomicLong elapse:2449ms
threadCount:20, times:10000000
LongAdder elapse:429ms
AtomicLong elapse:5142ms
threadCount:40, times:10000000
LongAdder elapse:840ms
AtomicLong elapse:10506ms
threadCount:80, times:10000000
LongAdder elapse:1369ms
AtomicLong elapse:20482ms可以看到當只有一個線程的時候,AtomicLong反而性能更高,隨著線程越來越多,AtomicLong的性能急劇下降,而LongAdder的性能影響很小。
(1)LongAdder通過base和cells數(shù)組來存儲值;
(2)不同的線程會hash到不同的cell上去更新,減少了競爭;
(3)LongAdder的性能非常高,最終會達到一種無競爭的狀態(tài);
在longAccumulate()方法中有個條件是n >= NCPU就不會走到擴容邏輯了,而n是2的倍數(shù),那是不是代表cells數(shù)組大只能達到大于等于NCPU的最小2次方?
答案是明確的。因為同一個CPU核心同時只會運行一個線程,而更新失敗了說明有兩個不同的核心更新了同一個Cell,這時會重新設置更新失敗的那個線程的probe值,這樣下一次它所在的Cell很大概率會發(fā)生改變,如果運行的時間足夠長,最終會出現(xiàn)同一個核心的所有線程都會hash到同一個Cell(大概率,但不一定全在一個Cell上)上去更新,所以,這里cells數(shù)組中長度并不需要太長,達到CPU核心數(shù)足夠了。
比如,筆者的電腦是8核的,所以這里cells的數(shù)組大只會到8,達到8就不會擴容了。

歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

另外有需要云服務器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。