public?class?LongAdder?{
???private?long?count?=?0L;
???public?void?add()?{
???????count ;
???}
}
可以加鎖去實現(xiàn),但效率太低。public?class?LongAdder?{
???private?long?count?=?0L;
???public?void?add()?{
???synchronized(this){
???????count ;
???}
}
可以用原子類這種樂觀鎖實現(xiàn),比加 synchronized 鎖效率高很多。public?class?LongAdder?{
???private?AtomicLong?count?=?new?AtomicLong(0L);
???public?void?add()?{
???????count.incrementAndGet();
???}
}
當然,更高級的玩法也可以自己調(diào)用 UNSAFE 模擬原子類里的 CAS 操作,但實際上就是把原子類的源碼給展開了。(v = count 應(yīng)該放在循環(huán)里)public?class?LongAdder?{
???private?volatile?long?count?=?0L;
???public?void?add()?{
???????boolean?success?=?false;
???????int?v =?count;
???????while(!success)?{
???????????success?=?UNSAFE.compareAndSwapLong(
????????????????????LongAdder.class,?countOffset,?v,?v ?1);
???????}
???}
}
這幾段多線程 1 的代碼如果看不明白,可以找些資料把這塊的基礎(chǔ)補一下哈,本文就不贅述了,我們繼續(xù)。關(guān)于這個多線程 1 操作,有沒有效率更高的辦法呢?分析需求
我們先別急著想,怎么把它變快,一頭扎到技術(shù)實現(xiàn)上。同我們接一個新產(chǎn)品的需求一樣,我們首先分析一下,這個產(chǎn)品提出這個需求的核心目的是什么,有時候往往可以使問題簡化。我們想一個極端的場景,成百上千個線程一直連續(xù)不斷對這個 count 進行 1 操作,一直加上個一年,一年后,我們只需要看一下最終的值是多少,即可。整個功能就是這樣,加一年,最后看那么一下。我們看看之前的原子類 1 的代碼。
public?class?LongAdder?{
???private?AtomicLong?count?=?new?AtomicLong(0L);
???public?void?add()?{
???????count.incrementAndGet();
???}
}
每時每刻都將 1 的操作真真正正計算了一遍,并賦值給 count。但我們只是一年后要讀取這個 count 值一次,顯然,中間這一年對 count 值準確地計算出結(jié)果,就是不必要的。而恰恰是因為每次都要準確計算出它的結(jié)果,導(dǎo)致多線程之間發(fā)生了競爭,浪費了資源。那思路就打開了!設(shè)計思路
我們事先搞出多個這種 count 變量,并且用某種方式讓不同線程對應(yīng)到不同 count 變量上。你看這樣,如果僅僅有四個線程,就完全不存在線程競爭的問題,每個線程操作唯一的變量。過一段時間后, 獲取最終的值,只需要把它們加和即可。這樣,獲取 count 值的復(fù)雜度增加了,需要做個加和操作,但卻是整個過程完全沒有線程競爭。犧牲讀性能,換取寫性能。用空間換時間。你看,即使一個小小的多線程 1 操作的設(shè)計,也存在架構(gòu)思維中的 trade off 思想,這在我之前兩篇架構(gòu)文章中多次提到。正所謂,不存在完美的算法,我們都只是在做平衡,犧牲這個,才能換取那個。
具體實現(xiàn)
設(shè)計思路中,我們盡可能把問題簡化,才能得到一個大方向。現(xiàn)在我們要具體設(shè)計了,就要把剛剛懶得思考的問題,拿出來了,這個過程的確比較痛苦。
懶加載
首先,我們當然希望,整個過程都不存在線程競爭。這樣我們一開始就創(chuàng)建了那么多 count,并且把線程一一映射過去,假如本來他們共同對同一個共享變量 1 就不會產(chǎn)生競爭,那這種方式就有很大問題了:1. 浪費了空間2. 多了線程映射的算法邏輯3. 最終獲取值時還要加和得不償失呀。所以我們采用懶加載的辦法,一開始,仍然是對同一個共享變量 1,等真正出現(xiàn)競爭了,再開始啟用更多的 count。我們把一開始使用的唯一共享變量叫做 base,把之后開啟的多個變量叫做 Cell 類,放在一個 Cell[] 數(shù)組里。Cell 類里只有一個變量就是 value,存儲累加過程中的值。數(shù)組擴容
一開始,這個 Cell 數(shù)組是空的。等 base 變量出現(xiàn)了一次競爭失敗的情況,就初始化這個 Cell[] 數(shù)組,第一次里面放兩個 Cell。此時,如果只有三個線程 1,就可以保證不會發(fā)生競爭。但如果此時又來了一個線程,導(dǎo)致了競爭,即 CAS 失敗,那么可以擴容 Cell[] 數(shù)組。可以注意到我畫的,Cell 數(shù)組初始大小為 2,之后擴容也是翻倍的方式,不知道你有沒有想到些什么,我們接著往下看。線程映射綁定
剛剛,我們一直默認,線程和 Cell 數(shù)組中的每個 Cell 是一一對應(yīng)的關(guān)系,可是怎么做到這一點呢?我們在每個線程中,維護一個局部變量,這個變量屬于這個線程,這個變量的值根據(jù) Cell[] 數(shù)組的大小哈希取模,就可以映射到其中一個 Cell 上了。那同線程綁定的這個局部變量是怎么來的呢?別擔(dān)心,JDK 已經(jīng)幫我們設(shè)計好了,這就是 Thread 類里的變量 probe。public?class?Thread?implements?Runnable?{
???...
???int?threadLocalRandomProbe;
???...
}
但是我們不能直接獲取,需要借助 ThreadLocalRandom 類的如下辦法獲取。static?final?int?getProbe()?{
???return?UNSAFE.getInt(Thread.currentThread(),?PROBE);
}
當然,獲取出的這個值,可能哈希取模后也會發(fā)生沖突。沒關(guān)系,請注意,這只是哈希取模沖突,也就是多個線程可能要對同一個 Cell 里的 value 進行 CAS 1 操作,但不一定會產(chǎn)生競爭。所以,發(fā)生哈希取模沖突后,先直接嘗試 CAS 1 操作,如果能成功,就沒那么多事了。但假如恰好,CAS 的時候又發(fā)生了競爭,導(dǎo)致操作失敗怎么辦?還好,可以用這種方式為該線程的 probe 重新賦值。
probe?=?新的值,自己生成一個;
UNSAFE.putInt(Thread.currentThread(),?PROBE,?probe);
重新賦值后的 probe,再次經(jīng)過哈希取模后,就不會和之前的沖突了。但很不幸,假如再沖突怎么辦?那就再次嘗試 CAS 1 操作。但假如又很不幸,CAS 1 操作又失敗了,要不要繼續(xù)重新賦值 probe 呢?要,不過,此時說明競爭已經(jīng)很激烈了。簡單說就是,這個 Cell 數(shù)組有點擁擠了,此時我們選擇將數(shù)組擴容!擴容大家還記得吧,就是上一節(jié)中的。這就又回到了上一節(jié)中的步驟,如此循環(huán)往復(fù)。當然,擴容也要有個限度,我們規(guī)定,數(shù)組大小超過 CPU 核心數(shù)后,就不再擴容了。CPU 核心數(shù),可以用如下方式獲取。
Runtime.getRuntime().availableProcessors();
如果再發(fā)生沖突和競爭的情況,那就不斷重新賦值 probe,不斷嘗試 CAS。有同學(xué)可能會說,那一直不成功咋辦呢?別忘了,如果不使用我們這個 LongAdder,僅僅用一個原子類不斷 1,失敗的概率是更高的,我們已經(jīng)通過將線程分散到不同 Cell,降低了發(fā)生競爭失敗的概率了。
執(zhí)行流程
至此,設(shè)計思路和實現(xiàn)過程,就都搞定了,我們來看一下整個流程。1. 最開始只有一個 base 變量,多個線程和諧地進行 CAS 1 操作。2. 直到有一天,兩個線程發(fā)生了競爭,即其中一個線程 CAS 時失敗了,那么就創(chuàng)建一個大小為 2 的 Cell[] 數(shù)組,用線程私有的局部變量 probe 取模,映射到一個 Cell 上,對其 CAS 1 操作。3. 不過假如線程 probe 取模后,發(fā)現(xiàn)那個 Cell 已經(jīng)被綁定過了,不要緊,先 CAS 1 試一試。4. 但如果沒試成功,說明此處有競爭,那重新計算一下線程的 probe 值,映射到一個新的 Cell 上。5. 如果此時又沖突,并且 CAS 1 又失敗,那么將 Cell[] 數(shù)組擴容。6. 最后當要獲取最終的累計和時,用 base 的值,加上所有 Cell[] 數(shù)組里的 value 值,得出一個和,返回給調(diào)用方。這個破玩意,其實 JDK 中早有實現(xiàn),又是 Doug Lea 大神寫的一個類,名為LongAdder
LongAdder
通過我們剛剛眼花繚亂的分析,再看 Doug Lea 大神的 LongAdder 就非常容易了。我們看最核心的 add 方法,這是最外層的邏輯,很容易理解。
public?class?LongAdder?extends?Striped64?implements?Serializable?{
???public?void?add(long?x)?{
???????Cell[]?as;?long?b,?v;?int?m;?Cell?a;
???????if?((as?=?cells)?!=?null?||?!casBase(b?=?base,?b? ?x))?{
???????????//?已經(jīng)初始化了?Cell?數(shù)組
???????????//?或者對?base?變量?CAS? 1?操作失敗
???????????//?就走到這里了
???????????boolean?uncontended?=?true;
???????????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
???????????????//?下面這行就是?probe?取模操作
???????????????(a?=?as[getProbe()?