當(dāng)前位置:首頁 > 公眾號精選 > Linux閱碼場
[導(dǎo)讀]我們在工作中會經(jīng)常遇到線程同步,那么到底什么是線程同步呢,線程同步的本質(zhì)是什么,線程同步的方法又有哪些,為什么會有這些方法呢?在回答這些問題之前,我們先做幾個名詞解釋,以便建立共同的概念基礎(chǔ)。

程磊,一線碼農(nóng),在某手機(jī)公司擔(dān)任系統(tǒng)開發(fā)工程師;閱碼場榮譽(yù)總編輯;日常喜歡研究內(nèi)核基本原理。


目錄:

一、概念解析

1.1 名詞解釋

1.2 線程同步與防同步

二、線程防同步方法

2.1 時間上防同步

2.2 空間上防同步

2.3 事后防同步

三、原子操作

3.1 int原子操作

3.2 long原子操作

3.3 bit原子操作

四、加鎖機(jī)制

4.1 鎖的底層原理

4.2 簡單自旋鎖

4.3 互斥鎖

4.4 信號量

五、per-CPU 變量

六、RCU 簡介

七、序列鎖

八、總結(jié)回顧




一、概念解析


我們在工作中會經(jīng)常遇到線程同步,那么到底什么是線程同步呢,線程同步的本質(zhì)是什么,線程同步的方法又有哪些,為什么會有這些方法呢?在回答這些問題之前,我們先做幾個名詞解釋,以便建立共同的概念基礎(chǔ)。


1.1 名詞解釋


CPU: 本文中的CPU都是指邏輯CPU。
UP: 單處理器(單CPU)。
SMP: 對稱多處理器(多CPU)。
線程、執(zhí)行流: 線程的本質(zhì)是一個執(zhí)行流,但執(zhí)行流不僅僅有線程,還有ISR、softirq、tasklet,都是執(zhí)行流。本文中說到線程一般是泛指各種執(zhí)行流,除非是在需要區(qū)分不同執(zhí)行流時,線程才特指狹義的線程。
并發(fā)、并行: 并發(fā)是指線程在宏觀上表現(xiàn)為同時執(zhí)行,微觀上可能是同時執(zhí)行也可能是交錯執(zhí)行,并行是指線程在宏觀上是同時執(zhí)行,微觀上也是同時執(zhí)行。
偽并發(fā)、真并發(fā): 偽并發(fā)就是微觀上是交錯執(zhí)行的并發(fā),真并發(fā)就是并行。UP上只有偽并發(fā),SMP上既有偽并發(fā)也有真并發(fā)。
臨界區(qū): 訪問相同數(shù)據(jù)的代碼段,如果可能會在多個線程中并發(fā)執(zhí)行,就叫做臨界區(qū),臨界區(qū)可以是一個代碼段被多個線程并發(fā)執(zhí)行,也可以是多個不同的代碼段被多個線程并發(fā)執(zhí)行。
同步: 首先線程同步的同步和同步異步的同步,不是一個意思。線程同步的同步,本文按照字面意思進(jìn)行解釋,同步就是統(tǒng)一步調(diào)、同時執(zhí)行的意思。
線程同步現(xiàn)象: 線程并發(fā)過程中如果存在臨界區(qū)并發(fā)執(zhí)行的情況,就叫做線程同步現(xiàn)象。
線程防同步機(jī)制: 如果發(fā)生線程同步現(xiàn)象,由于臨界區(qū)會訪問共同的數(shù)據(jù),程序可能就會出錯,因此我們要防止發(fā)生線程同步現(xiàn)象,也就是防止臨界區(qū)并發(fā)執(zhí)行的情況,為此我們采取的防范措施叫做線程防同步機(jī)制。


1.2 線程同步與防同步


為什么線程同步叫線程同步,不叫線程防同步,叫線程同步給人的感覺好像就是要讓線程同時執(zhí)行的意思啊。但是實際上線程同步是讓臨界區(qū)不要并發(fā)執(zhí)行的意思,不管你們倆誰先執(zhí)行,只要錯開,誰先誰后執(zhí)行都可以。所以本文后面都采用線程防同步機(jī)制、防同步機(jī)制等詞。


我小時候一直有個疑惑,感冒藥為啥叫感冒藥,感冒藥是讓人感冒的藥啊,不是應(yīng)該叫治感冒藥才對嗎,治療感冒的藥。后來一想,就沒有讓人感冒的藥,所以感冒藥表達(dá)的就是治療感冒的藥,沒必要加個治字。但是同時還有一種藥,叫老鼠藥,是治療老鼠的藥嗎,不是啊,是要毒死老鼠的藥,因為沒有人會給老鼠治病。不過我們那里也有把老鼠藥叫做害老鼠藥的,加個害字,意思更明確,不會有歧義。


因此本文用了兩個詞,同步現(xiàn)象、防同步機(jī)制,使得概念更加清晰明確。


說了這么多就是為了闡述一個非常簡單的概念,就是不能同時操作相同的數(shù)據(jù),因為可能會出錯,所以我們要想辦法防止,這個方法我們把它叫做線程防同步。


還有一個詞是競態(tài)條件(race condition),很多關(guān)于線程同步的書籍文檔中都有提到,但是我一直沒有理解是啥意思。競態(tài)條件,條件,線程同步和條件也沒啥關(guān)系啊;競態(tài),也不知道是什么意思。再看它的英文,condition有情況的意思,race有賽跑、競爭的意思,是不是要表達(dá)賽跑情況、競爭現(xiàn)象,想說兩個線程在競爭賽跑,看誰能先訪問到公共數(shù)據(jù)。我發(fā)現(xiàn)沒有競態(tài)條件這個詞對我們理解線程同步問題一點影響都沒有,有了這個詞反而不明所以,所以我們就忽略這個詞。




二、線程防同步方法


在我們理解了同步現(xiàn)象、防同步機(jī)制這個詞后,下面的就很好理解了。同步現(xiàn)象是指同時訪問相同的數(shù)據(jù),那么如何防止呢,我不讓你同時訪問相同的數(shù)據(jù)不就可以了嘛。因此防同步機(jī)制有三大類方法。



2.1 時間上防同步


我不讓你們同時進(jìn)入臨界區(qū),這樣就不會同時操作相同的數(shù)據(jù)了,有三種方法:


1.原子操作
對于個別特別簡單特別短的臨界區(qū),CPU會提供一些原子指令,在一條指令中把多個操作完成,兩個原子指令必然一個在前一個在后地執(zhí)行,不可能同時執(zhí)行。原子指令能防偽并發(fā)和真并發(fā),適用于UP和SMP。


2.加鎖
對于大部分臨界區(qū)來說,代碼都比較復(fù)雜,CPU不可能都去實現(xiàn)原子指令,因此可以在臨界區(qū)的入口處加鎖,同一時間只能有一個線程進(jìn)入,獲得鎖的線程進(jìn)入,在臨界區(qū)的出口處再釋放鎖。未獲得鎖的線程在外面等待,等待的方式有兩種,忙等待的叫做自旋鎖,休眠等待的叫做阻塞鎖。根據(jù)臨界區(qū)內(nèi)的數(shù)據(jù)讀寫操作不同,鎖又可以分為單一鎖和讀寫鎖,單一鎖不區(qū)分讀者寫者,所有用戶都互斥;讀寫鎖區(qū)分讀者和寫者,讀者之間可以并行,寫者與讀者、寫者與寫者之間是互斥的。自旋鎖有單一鎖和讀寫鎖,阻塞鎖也有單一鎖和讀寫鎖。自旋鎖只能防真并發(fā),適用于SMP;休眠鎖能防偽并發(fā)和真并發(fā),適用于UP和SMP。


3.臨時禁用偽并發(fā)
對于某些由于偽并發(fā)而產(chǎn)生的同步問題,可以通過在臨界區(qū)的入口處禁用此類偽并發(fā)、在臨界區(qū)的出口處再恢復(fù)此類偽并發(fā)來解決。這種方式顯然只能防偽并發(fā),適用于UP和SMP上的單CPU。而自旋鎖只能防真并發(fā),所以在SMP上經(jīng)常會同時使用這兩種方法,同時防偽并發(fā)和真并發(fā)。


臨時禁用偽并發(fā)有三種情況:


a.禁用中斷
如果線程和中斷、軟中斷和中斷之間會訪問公共數(shù)據(jù),那么在前者的臨界區(qū)內(nèi)可以臨時禁用后者,也就是禁用中斷,達(dá)到防止偽并發(fā)的目的。在后者的臨界區(qū)內(nèi)不用采取措施,因為前者不能搶占后者,但是后者能搶占前者。前者一般會同時使用禁中斷和自旋鎖,禁中斷防止偽并發(fā),自旋鎖防止真并發(fā)。
b.禁用軟中斷
如果線程和軟中斷會訪問公共數(shù)據(jù),那么在前者的臨界區(qū)內(nèi)禁用后者,也就是禁用軟中斷,可以達(dá)到防止偽并發(fā)的目的。后者不用采取任何措施,因為前者不會搶占后者。前者也可以和自旋鎖并用,同時防止偽并發(fā)和真并發(fā)。
c.禁用搶占
如果線程和線程之間會訪問公共數(shù)據(jù),那么可以在臨界區(qū)內(nèi)禁用搶占,達(dá)到防止偽并發(fā)的目的。禁用搶占也可以和自旋鎖并用,同時防止偽并發(fā)和真并發(fā)。


2.2 空間上防同步


你們可以同時,但是我不讓你們訪問相同的數(shù)據(jù),有兩種方法:


1. 數(shù)據(jù)分割
把大家都要訪問的公共數(shù)據(jù)分割成N份,各訪問各的。這也有兩種情況:
a. 在SMP上如果多個CPU要經(jīng)常訪問一個全局?jǐn)?shù)據(jù),那么可以把這個數(shù)據(jù)拆分成NR_CPU份,每個CPU只訪問自己對應(yīng)的那份,這樣就不存在并發(fā)問題了,這個方法叫做 per-CPU 變量,只能防真并發(fā),適用于SMP,需要和禁用搶占配合使用。
b. TLS,每個線程都用自己的局部數(shù)據(jù),這樣就不存在并發(fā)問題了,能防真并發(fā)和偽并發(fā),適用于UP和SMP。


2. 數(shù)據(jù)復(fù)制
RCU,只適用于用指針訪問的動態(tài)數(shù)據(jù)。讀者復(fù)制指針,然后就可以隨意讀取數(shù)據(jù)了,所有的讀者可以共同讀一份數(shù)據(jù)。寫者復(fù)制數(shù)據(jù),然后就可以隨意修改復(fù)制后的數(shù)據(jù)了,因為這份數(shù)據(jù)是私有的,不過修改完數(shù)據(jù)之后要修改指針指向最新的數(shù)據(jù),修改指針的這個操作需要是原子的。對于讀者來說,它是復(fù)制完指針之后用自己的私有指針來訪問數(shù)據(jù)的,所以它訪問的要么是之前的數(shù)據(jù),要么是修改之后的數(shù)據(jù),而不會是不一致的數(shù)據(jù)。能防偽并發(fā)和真并發(fā),適用于UP和SMP。


2.3 事后防同步


不去積極預(yù)防并發(fā),而是假設(shè)不存在并發(fā),直接訪問數(shù)據(jù)。訪問完了之后再檢查剛才是否有并發(fā)發(fā)生,如果有就再重來一遍,一直重試,直到?jīng)]有并發(fā)發(fā)生為止。這就是內(nèi)核里面的序列鎖seqlock,能防偽并發(fā)和真并發(fā),適用于UP和SMP。


下面我們來畫張圖總結(jié)一下:



三、原子操作


我們在剛開始學(xué)習(xí)線程同步時,經(jīng)常用來舉的一個例子就是,就連一個簡單的i++操作都是線程不安全的。i++對于源碼來說已經(jīng)是非常簡單的語句了,但是它編譯成機(jī)器碼之后有三個指令,把數(shù)據(jù)從內(nèi)存加載到寄存器,把寄存器加1,把寄存器的值加1。如果有兩個線程同時執(zhí)行i++話,就會出問題,比如i最開始等于0,每個線程都循環(huán)一萬次i++,我們期望最后的結(jié)果是兩萬,但是實際上最后的結(jié)果是不到兩萬的。對于UP來說,兩個線程輪流執(zhí)行,如果線程切換的點落在三條指令之間就會出問題。對于SMP來說多個CPU同時執(zhí)行更會出問題。為此我們可以采取的辦法可以是每次i++都加鎖,這樣做當(dāng)然可以,不過這么做有點殺雞用牛刀了。很多CPU專門為這種基本的整數(shù)操作提供了原子指令。


3.1 int原子操作


硬件提供的都是直接對整數(shù)的原子操作,但是在Linux內(nèi)核里并不是直接對一個int類型進(jìn)行原子操作,而是對一個封裝后的數(shù)據(jù)進(jìn)行操作。本質(zhì)上還是對int進(jìn)行操作,那么為什么要做這么一層封裝呢?主要是為了接口語義,大家一看到一個變量是atomic_t類型的,大家立馬就明白這是一個原子變量,不能使用普通加減操作進(jìn)行運(yùn)算,要使用專門的接口函數(shù)來操作才對。


typedef struct { int counter;} atomic_t;


那么對于 atomic_t的原子操作都有哪些呢?


Atomic Integer Operation Description
ATOMIC_INIT(int i) At declaration, initialize to i
int atomic_read(atomic_t *v) Atomically read the integer value of v
void atomic_set(atomic_t *v, int i) Atomically set v equal to i.
void atomic_add(int i, atomic_t *v) Atomically add i to v
void atomic_sub(int i, atomic_t *v) Atomically subtract i from v
void atomic_inc(atomic_t *v) Atomically add one to v
void atomic_dec(atomic_t *v) Atomically subtract one from v
int atomic_sub_and_test(int i, atomic_t *v) Atomically subtract i from v and return true if the result is zero; otherwise false.
int atomic_add_negative(int i, atomic_t *v) Atomically add i to v and return true if the result is negative; otherwise false.
int atomic_add_return(int i, atomic_t *v) Atomically add i to v and return the result.
int atomic_sub_return(int i, atomic_t *v) Atomically subtract i from v and return the result.
int atomic_dec_and_test(atomic_t *v) Atomically decrement v by one and return true if zero; false otherwise
int atomic_inc_and_test(atomic_t *v) Atomically increment v by one and return true if the result is zero; false otherwise.


3.2 long原子操作


如果上面的int類型(32位)不能滿足我們的原子操作需求,系統(tǒng)還為我們定義了64位的原子變量。


typedef struct { s64 counter;} atomic64_t;


同樣的也為我們提供了一堆原子接口:


Atomic Integer Operation Description
ATOMIC64_INIT(int i) At declaration, initialize to i
int atomic64_read(atomic_t *v) Atomically read the integer value of v
void atomic64_set(atomic_t *v, int i) Atomically set v equal to i.
void atomic64_add(int i, atomic_t *v) Atomically add i to v
void atomic64_sub(int i, atomic_t *v) Atomically subtract i from v
void atomic64_inc(atomic_t *v) Atomically add one to v
void atomic64_dec(atomic_t *v) Atomically subtract one from v
int atomic64_sub_and_test(int i, atomic_t *v) Atomically subtract i from v and return true if the result is zero; otherwise false.
int atomic64_add_negative(int i, atomic_t *v) Atomically add i to v and return true if the result is negative; otherwise false.
int atomic64_add_return(int i, atomic_t *v) Atomically add i to v and return the result.
int atomic64_sub_return(int i, atomic_t *v) Atomically subtract i from v and return the result.
int atomic64_dec_and_test(atomic_t *v) Atomically decrement v by one and return true if zero; false otherwise
int atomic64_inc_and_test(atomic_t *v) Atomically increment v by one and return true if the result is zero; false otherwise.


3.3 bit原子操作


系統(tǒng)還給我們提供了位運(yùn)算的原子操作,不過并沒有封裝數(shù)據(jù)類型,而是操作一個void * 指針?biāo)赶虻臄?shù)據(jù),我們要操作的位數(shù)要在我們希望的數(shù)據(jù)之內(nèi),這點是由我們自己來保證的。


Atomic Bitwise Operation Description
void set_bit(int nr, void *addr) Atomically set the nr-th bit starting from addr.
void clear_bit(int nr, void *addr) Atomically clear the nr-th bit starting from addr.
void change_bit(int nr, void *addr) Atomically flip the value of the nr-th bit starting from addr.
int test_and_set_bit(int nr, void *addr) Atomically set the nr-th bit starting from addr and return the previous value.
int test_and_clear_bit(int nr, void *addr) Atomically clear the nr-th bit starting from addr and return the previous value.
int test_and_change_bit(int nr, void *addr) Atomically flip the nr-th bit starting from addr and return the previous value.
int test_bit(int nr, void *addr) Atomically return the value of the nrth bit starting from addr.


有了原子操作,我們對這些簡單的基本運(yùn)算就不用使用加鎖機(jī)制了,就可以提高效率。



四、加鎖機(jī)制


有很多臨界區(qū),并不是一些簡單的整數(shù)運(yùn)算,不可能要求硬件都給提供原子操作。為此,我們需要鎖機(jī)制,在臨界區(qū)的入口進(jìn)行加鎖操作,加到鎖的才能進(jìn)入臨界區(qū)進(jìn)行操作,加不到鎖的要一直在臨界區(qū)外面等候。等候的方式有兩種,一種是忙等待,就是自旋鎖,一種是休眠等待,就是阻塞鎖,阻塞鎖在Linux里面的實現(xiàn)叫做互斥鎖。


4.1 鎖的底層原理


我們該如何實現(xiàn)一個鎖呢,下面我們嘗試直接用軟件來實現(xiàn)試試,代碼如下:

int lock = 0; void lock(int * lock){start: if(*lock == 0) *lock = 1; else{ wait(); goto start; }} void unlock(int * lock){ *lock = 0; wakeup();}


可以看到這個鎖的實現(xiàn)邏輯很簡單,定義一個整數(shù)作為鎖,0代表沒人持鎖,1代表有人持鎖。我們先判斷鎖的值,如果是0代表沒人持鎖,我們給鎖賦值1,代表我們獲得了鎖,然后函數(shù)返回,就可以進(jìn)入臨界區(qū)了。如果鎖是1,代表有人已經(jīng)持有了鎖,此時我們就需要等待,等待函數(shù)wait,可以用忙等待,也可以用休眠等待。釋放鎖的時候把鎖設(shè)為0,然后wakeup其他線程或者為空操作。被喚醒的線程從wait中醒來,然后又重走加鎖流程。但是這里面有個問題,就是加鎖操作也是個臨界區(qū),如果兩個線程在兩個CPU上同時執(zhí)行到加鎖函數(shù),雙方都檢測到鎖是0,然后都把鎖置為1,都加鎖成功,這不是出問題了嗎。鎖就是用來保護(hù)臨界區(qū)的,但是加鎖本身也是臨界區(qū),也需要保護(hù),該怎么辦呢?唯一的方法就是求助于硬件,讓加鎖操作成為原子操作,X86平臺提供了CAS指令來實現(xiàn)這個功能。CAS,Compare and Swap,比較并交換,它的接口邏輯是這樣的:


int cas(int * p, old_value, new_value)


如果p位置的值等于old_value,就把p位置的值設(shè)置為new_value,并返回1,否則返回0。關(guān)鍵就在于cas它是硬件實現(xiàn)的,是原子的。這樣我們就可以用cas指令來實現(xiàn)鎖的邏輯,如下所示:


int lock = 0; void lock(int * lock){start: if(cas(lock, 0, 1)) return; else{ wait(); goto start; }} void unlock(int * lock){ *lock = 0; wakeup();}


4.2 簡單自旋鎖


為什么在這里要先講簡單自旋鎖呢?因為互斥鎖、信號量這些鎖的內(nèi)部實現(xiàn)都用了自旋鎖,所以先把自旋鎖的邏輯講清楚才能繼續(xù)講下去。為什么是簡單自旋鎖呢,因為自旋鎖現(xiàn)在已經(jīng)發(fā)展得很復(fù)雜了,所以這里就是講一下自旋鎖的簡單版本,因為它們的基本邏輯是一致的。自旋鎖由于在加鎖失敗時是忙等待,所以不用考慮等待隊列、睡眠喚醒的問題,所以實現(xiàn)比較簡單,下面是簡單自旋鎖的實現(xiàn)代碼:


int lock = 0; void spin_lock(int * lock){ while(!cas(lock, 0, 1))} void spin_unlock(int * lock){ *lock = 0;}


可以看到簡單自旋鎖的代碼是相當(dāng)簡單,加鎖的時候不停地嘗試加鎖,一直失敗一直加,直到成功才返回。釋放鎖的時候更簡單,直接把鎖置為0就可以了。此時如果有其它自旋鎖在自旋,由于鎖已經(jīng)變成了0,所以就會加鎖成功,結(jié)束自旋。注意這個代碼只是自旋鎖的邏輯演示,并不是真正的自旋鎖實現(xiàn)。


4.3 互斥鎖


互斥鎖是休眠鎖,加鎖失敗時要把自己放入等待隊列,釋放鎖的時候要考慮喚醒等待隊列的線程?;コ怄i的定義如下(刪除了一些配置選項):


struct mutex { atomic_long_t    owner; raw_spinlock_t    wait_lock; struct list_head  wait_list;};


可以看到互斥鎖的定義有atomic_long_t owner,這個和我們之前定義的int lock是相似的,只不過這里是個加強(qiáng)版,0代表沒加鎖,加鎖的時候是非0,而不是簡單的1,而是記錄的是加鎖的線程。然后是自旋鎖和等待隊列,自旋鎖是用來保護(hù)等待隊列的。這里的自旋鎖為什么要用raw_spinlock_t呢,它和spinlock_t有什么區(qū)別呢?在標(biāo)準(zhǔn)的內(nèi)核版本下是沒有區(qū)別的,如果打了RTLinux補(bǔ)丁之后它們就不一樣了,spinlock_t會轉(zhuǎn)化為阻塞鎖,raw_spinlock_t還是自旋鎖,如果需要在任何情況下都要自旋的話請使用raw_spinlock_t。下面我們看一下它的加鎖操作:


void __sched mutex_lock(struct mutex *lock){ might_sleep();  if (!__mutex_trylock_fast(lock)) __mutex_lock_slowpath(lock);}static __always_inline bool __mutex_trylock_fast(struct mutex *lock){ unsigned long curr = (unsigned long)current; unsigned long zero = 0UL;  if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr)) return true;  return false;}


可以看到加鎖時先嘗試直接加鎖,用的就是CAS原子指令(x86的CAS指令叫做cmpxchg)。如果owner是0,代表當(dāng)前鎖是開著的,就把owner設(shè)置為自己(也就是當(dāng)前線程,struct task_struct * 強(qiáng)轉(zhuǎn)為 ulong),代表自己成為這個鎖的主人,也就是自己加鎖成功了,然后返回true。如果owner不為0的話,代表有人已經(jīng)持有鎖了,返回false,后面就要走慢速路徑了,也就是把自己放入等待隊列里休眠等待。下面看看慢速路徑的代碼是怎么實現(xiàn)的:


static noinline void __sched__mutex_lock_slowpath(struct mutex *lock){ __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);} static int __sched__mutex_lock(struct mutex *lock, unsigned int state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip){ return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);} static __always_inline int __sched__mutex_lock_common(struct mutex *lock, unsigned int state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip, struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx){ struct mutex_waiter waiter; int ret;  raw_spin_lock(&lock->wait_lock); waiter.task = current; __mutex_add_waiter(lock, &waiter, &lock->wait_list);  set_current_state(state); for (;;) { bool first; if (__mutex_trylock(lock)) goto acquired;  raw_spin_unlock(&lock->wait_lock); schedule_preempt_disabled();  first = __mutex_waiter_is_first(lock, &waiter); set_current_state(state); if (__mutex_trylock_or_handoff(lock, first)) break;  raw_spin_lock(&lock->wait_lock); } acquired: __set_current_state(TASK_RUNNING); __mutex_remove_waiter(lock, &waiter); raw_spin_unlock(&lock->wait_lock);  return ret;} static void__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter, struct list_head *list){ debug_mutex_add_waiter(lock, waiter, current);  list_add_tail(&waiter->list, list); if (__mutex_waiter_is_first(lock, waiter)) __mutex_set_flag(lock, MUTEX_FLAG_WAITERS);}


可以看到慢速路徑最終會調(diào)用函數(shù)__mutex_lock_common,這個函數(shù)本身是很復(fù)雜的,這里進(jìn)行了大量的刪減,只保留了核心邏輯。函數(shù)先加鎖mutex的自旋鎖,然后把自己放到等待隊列上去,然后就在無限for循環(huán)中休眠并等待別人釋放鎖并喚醒自己。For循環(huán)的入口處先嘗試加鎖,如果成功就goto acquired,如果不成功就釋放自旋鎖,并調(diào)用schedule_preempt_disabled,此函數(shù)就是休眠函數(shù),它會調(diào)度其它進(jìn)程來執(zhí)行,自己就休眠了,直到有人喚醒自己才會醒來繼續(xù)執(zhí)行。別人釋放鎖的時候會喚醒自己,這個我們后面會分析。當(dāng)我們被喚醒之后會去嘗試加鎖,如果加鎖失敗,再次來到for循環(huán)的開頭處,再試一次加鎖,如果不行就再走一次休眠過程。為什么我們加鎖會失敗呢,因為有可能多個線程同時被喚醒來爭奪鎖,我們不一定會搶鎖成功。搶鎖失敗就再去休眠,最后總會搶鎖成功的。


把自己加入等待隊列時會設(shè)置flag MUTEX_FLAG_WAITERS,這個flag是設(shè)置在owner的低位上去,因為task_struct指針至少是L1_CACHE_BYTES對齊的,所以最少有3位可以空出來做flag。


下面我們再來看一下釋放鎖的流程:


void __sched mutex_unlock(struct mutex *lock){  if (__mutex_unlock_fast(lock)) return; __mutex_unlock_slowpath(lock, _RET_IP_);} static __always_inline bool __mutex_unlock_fast(struct mutex *lock){ unsigned long curr = (unsigned long)current;  return atomic_long_try_cmpxchg_release(&lock->owner, &curr, 0UL);}


解鎖的時候先嘗試快速解鎖,快速解鎖的意思是沒有其它在等待隊列里,可以直接釋放鎖。怎么判斷等待隊列里沒有線程在等待呢,這就是前面設(shè)置的MUTEX_FLAG_WAITERS的作用了。如果設(shè)置了這個flag,lock->owner 和 curr就不會相等,直接釋放鎖就會失敗,就要走慢速路徑。慢速路徑的代碼如下:


static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip){ struct task_struct *next = NULL; DEFINE_WAKE_Q(wake_q); unsigned long owner;  owner = atomic_long_read(&lock->owner); for (;;) { if (atomic_long_try_cmpxchg_release(&lock->owner, &owner, __owner_flags(owner))) { if (owner & MUTEX_FLAG_WAITERS) break; } }  raw_spin_lock(&lock->wait_lock); if (!list_empty(&lock->wait_list)) { struct mutex_waiter *waiter = list_first_entry(&lock->wait_list, struct mutex_waiter, list); next = waiter->task; wake_q_add(&wake_q, next); } raw_spin_unlock(&lock->wait_lock); wake_up_q(&wake_q);}


上述代碼進(jìn)行了一些刪減??梢钥闯錾鲜龃a會先釋放鎖,然后喚醒等待隊列里面的第一個等待者。


4.4 信號量


信號量與互斥鎖有很大的不同,互斥鎖代表只有一個線程能同時進(jìn)入臨界區(qū),而信號量是個整數(shù)計數(shù),代表著某一類資源有多少個,能同時讓多少個線程訪問這類資源。信號量也沒有加鎖解鎖操作,信號量類似的操作叫做down和up,down代表獲取一個資源,up代表歸還一個資源。


下面我們先看一下信號量的定義:


struct semaphore { raw_spinlock_t    lock; unsigned int    count; struct list_head  wait_list;}; #define __SEMAPHORE_INITIALIZER(name, n)        \{                  \ .lock    = __RAW_SPIN_LOCK_UNLOCKED((name).lock),  \ .count    = n,            \ .wait_list  = LIST_HEAD_INIT((name).wait_list),    \} static inline void sema_init(struct semaphore *sem, int val){ *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);}


可以看出信號量和互斥鎖的定義很相似,都有一個自旋鎖,一個等待隊列,不同的是信號量沒有owner,取而代之的是count,代表著某一類資源的個數(shù),而且自旋鎖同時保護(hù)著等待隊列和count。信號量初始化時要指定count的大小。


我們來看一下信號量的down操作(獲取一個資源):


void down(struct semaphore *sem){ unsigned long flags;  might_sleep(); raw_spin_lock_irqsave(&sem->lock, flags); if (likely(sem->count > 0)) sem->count--; else __down(sem); raw_spin_unlock_irqrestore(&sem->lock, flags);} static noinline void __sched __down(struct semaphore *sem){ __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);} static inline int __sched __down_common(struct semaphore *sem, long state, long timeout){ struct semaphore_waiter waiter;  list_add_tail(&waiter.list, &sem->wait_list); waiter.task = current; waiter.up = false;  for (;;) { if (signal_pending_state(state, current)) goto interrupted; if (unlikely(timeout <= 0)) goto timed_out; __set_current_state(state); raw_spin_unlock_irq(&sem->lock); timeout = schedule_timeout(timeout); raw_spin_lock_irq(&sem->lock); if (waiter.up) return 0; }  timed_out: list_del(&waiter.list); return -ETIME;  interrupted: list_del(&waiter.list); return -EINTR;}


可以看出我們會先持有自旋鎖,然后看看count是不是大于0,大于0的話代表資源還有剩余,我們直接減1,代表占用一份資源,就可以返回了。如果不大于0的話,代表資源沒有了,我們就進(jìn)去等待隊列等待。


我們再來看看up操作(歸還資源):


void up(struct semaphore *sem){ unsigned long flags;  raw_spin_lock_irqsave(&sem->lock, flags); if (likely(list_empty(&sem->wait_list))) sem->count++; else __up(sem); raw_spin_unlock_irqrestore(&sem->lock, flags);} static noinline void __sched __up(struct semaphore *sem){ struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; wake_up_process(waiter->task);}


先加鎖自旋鎖,然后看看等待隊列是否為空,如果為空的話直接把count加1就可以了。如果不為空的話,則代表有人在等待資源,資源就不加1了,直接喚醒隊首的線程來獲取。



五、per-CPU變量


前面講的原子操作和加鎖機(jī)制都是從時間上防同步的,現(xiàn)在我們開始講空間上防同步,先來講講per-CPU變量。如果我們要操作的數(shù)據(jù)和當(dāng)前CPU是密切相關(guān)的,不同的CPU可以操作不同的數(shù)據(jù),那么我們就可以把這個變量定義為per-CPU變量,每個CPU就可以各訪問各的,互不影響了。這個方法可以防止多個CPU之間的真并發(fā),但是同一個CPU上如果有偽并發(fā),還是會出問題,所以還需要禁用偽并發(fā)。per-CPU變量的定義和使用方法如下表所示:


Macro or function name Description
DEFINE_PER_CPU(type, name) Statically allocates a per-CPU array called name of type data structures
per_cpu(name, cpu) Selects the element for CPU cpu of the per-CPU array name
_ _get_cpu_var(name) Selects the local CPU’s element of the per-CPU array name
get_cpu_var(name) Disables kernel preemption, then selects the local CPU’s element of the per-CPU array name
put_cpu_var(name) Enables kernel preemption (name is not used)
alloc_percpu(type) Dynamically allocates a per-CPU array of type data structures and returns its address
free_percpu(pointer) Releases a dynamically allocated per-CPU array at address pointer
per_cpu_ptr(pointer, cpu) Returns the address of the element for CPU cpu of the per-CPU array at address pointer



六、RCU簡介


RCU是一種非常巧妙的空間防同步方法。首先它只能用于用指針訪問的動態(tài)數(shù)據(jù)。其次它采取讀者和寫者分開的方法,讀者讀取數(shù)據(jù)要先復(fù)制指針,用這個復(fù)制的指針來訪問數(shù)據(jù),這個數(shù)據(jù)是只讀的,不會被修改,很多讀者可以同時來訪問。寫者并不去直接更改數(shù)據(jù),而是先申請一塊內(nèi)存空間,把數(shù)據(jù)都復(fù)制過來,在這個復(fù)制的數(shù)據(jù)上修改數(shù)據(jù),由于這塊數(shù)據(jù)是私有的,所以可以隨意修改,也不用加鎖。修改完了之后,寫者要原子的修改指針的值,讓它指向自己新完成的數(shù)據(jù)。這對于讀者來說是沒有影響的,因為讀者已經(jīng)復(fù)制了指針,所以讀者讀的還是原來的數(shù)據(jù)沒有變,新來的讀者會復(fù)制新的指針,訪問新的數(shù)據(jù),讀者訪問的一直都是一致性的數(shù)據(jù),不會訪問到修改一半的數(shù)據(jù)。RCU的難點在于,原有的數(shù)據(jù)怎么回收,當(dāng)寫者更新指針之后,原先的數(shù)據(jù)就過期了,當(dāng)所有老的讀者都離開臨界區(qū)之后,這個數(shù)據(jù)的內(nèi)存需要被釋放,寫者需要判斷啥時候老的讀者全都離開臨界區(qū)了,才能去釋放老的數(shù)據(jù)。


七、序列鎖


除了前面講的時間防同步、空間防同步,Linux還有一種非常巧妙的防同步方法,那就是不妨了事后再補(bǔ)救,我第一次看到這個方法的時候,真是拍案叫絕,驚嘆不已,還能這么做。這種方法叫做序列鎖,它的思想就好比是,我家里也不鎖門了,小偷偷就偷吧,偷了我就再報警把東西找回來。反正小偷又不是天天來我家偷東西,小偷來的次數(shù)非常少,我天天鎖門太費(fèi)勁了。我每天早上看一下啥東西丟了沒,丟了就報警把東西找回來。當(dāng)然這個類比并不完全像,只是大概邏輯比較像,下面我們就來講一講序列鎖的做法。序列鎖區(qū)分讀者和寫者,讀者可以并行,寫者還是需要互斥的,讀者和寫者之間也可以并行,所以當(dāng)讀者很頻繁,寫者很偶發(fā)的時候就適合用序列鎖。這個鎖有一個序列號,初始值是0,寫者進(jìn)入臨界區(qū)時把這個序列號加1,退出時再加1,讀者讀之前先獲取這個鎖的序列號,如果是奇數(shù)說明有寫者在臨界區(qū),就不停地獲取序列號,直到序列號為偶數(shù)為止。然后讀者進(jìn)入臨界區(qū)進(jìn)行讀操作,然后退出臨界區(qū)的時候再讀取一下序列號。如果和剛才獲取的序列號不一樣,說明有寫者剛才進(jìn)來過,再重新走一遍剛才的操作。如果序列號還不一樣就一直重復(fù),直到序列號一樣為止。


下面我們先來看看序列鎖的定義:


typedef struct seqcount { unsigned sequence;} seqcount_t; typedef struct { seqcount_spinlock_t seqcount; spinlock_t lock;} seqlock_t;


seqlock_t包含一個自旋鎖和一個seqcount_spinlock_t,seqcount_spinlock_t經(jīng)過一些復(fù)雜的宏定義包含了seqcount_t,所以可以簡單地認(rèn)為seqlock_t包含一個自旋鎖和一個int序列號。


下面我們看一下寫者的操作:


 #include pid_t fork(void);


寫者的操作很簡單,就是用自旋鎖實現(xiàn)互斥,然后加鎖解釋的時候都把序列號增加1。


下面看讀者的操作:


static inline void write_seqlock(seqlock_t *sl){ spin_lock(&sl->lock); do_write_seqcount_begin(&sl->seqcount.seqcount);} static inline void write_sequnlock(seqlock_t *sl){ do_write_seqcount_end(&sl->seqcount.seqcount); spin_unlock(&sl->lock);}


讀者進(jìn)入臨界區(qū)之前先通過read_seqbegin獲取一個序列號,在臨界區(qū)的時候調(diào)用read_seqretry看看是否需要重走一遍臨界區(qū)。我們下面看一下內(nèi)核里使用序列鎖的一個例子:


static inline unsigned read_seqbegin(const seqlock_t *sl){ unsigned ret = read_seqcount_begin(&sl->seqcount);  kcsan_atomic_next(0);  /* non-raw usage, assume closing read_seqretry() */ kcsan_flat_atomic_begin(); return ret;} static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start){ /* * Assume not nested: read_seqretry() may be called multiple times when * completing read critical section. */ kcsan_flat_atomic_end();  return read_seqcount_retry(&sl->seqcount, start);}


可以看到寫者使用序列鎖和正常的使用方法是一樣的,讀者使用序列鎖一般都是配合do while循環(huán)一起使用。


通過本文,我們明白了線程同步的本質(zhì),了解了線程防同步的基本邏輯和具體方法。防同步就是防止多個執(zhí)行流同時訪問相同的數(shù)據(jù),所以我們可以從兩點來防,一個是同時(時間上防同步),一個是相同的數(shù)據(jù)(空間上防同步)。時間上防同步我們采取的方法有原子操作,通過硬件來防止同時,加鎖機(jī)制,軟件方法來防同時,還有禁用偽并發(fā),防止宏觀上的同時微觀上的交錯。空間防同步我們采取的方法有數(shù)據(jù)分割、per CPU變量,每個CPU值訪問自己對應(yīng)的數(shù)據(jù)。數(shù)據(jù)復(fù)制,RCU,讀的時候復(fù)制指針,讀的數(shù)據(jù)是不變的,寫的時候不直接改變數(shù)據(jù),而是先把數(shù)據(jù)復(fù)制過來,修改自己的私有副本,這樣就不會有同步的問題,然后再原子地更新指針指向最新的數(shù)據(jù)。


八、總結(jié)回顧


通過本文,我們明白了線程同步的本質(zhì),了解了線程防同步的基本邏輯和具體方法。防同步就是防止多個執(zhí)行流同時訪問相同的數(shù)據(jù),所以我們可以從兩點來防,一個是同時(時間上防同步),一個是相同的數(shù)據(jù)(空間上防同步)。時間上防同步我們采取的方法有原子操作,通過硬件來防止同時,加鎖機(jī)制,軟件方法來防同時,還有禁用偽并發(fā),防止宏觀上的同時微觀上的交錯??臻g防同步我們采取的方法有數(shù)據(jù)分割、per CPU變量,每個CPU值訪問自己對應(yīng)的數(shù)據(jù)。數(shù)據(jù)復(fù)制,RCU,讀的時候復(fù)制指針,讀的數(shù)據(jù)是不變的,寫的時候不直接改變數(shù)據(jù),而是先把數(shù)據(jù)復(fù)制過來,修改自己的私有副本,這樣就不會有同步的問題,然后再原子地更新指針指向最新的數(shù)據(jù)。


參考文獻(xiàn):

《Linux Kernel Development》

《Understanding the Linux Kernel》

《Professional Linux Kernel Architecture》

https://lwn.net/Kernel/Index/#Locking_mechanisms

http://www.wowotech.net/sort/kernel_synchronization




長按掃碼,備注“程磊”

小馬邀請您進(jìn)程磊老師交流群


本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或?qū)⒋呱龈蟮莫毥谦F公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

加利福尼亞州圣克拉拉縣2024年8月30日 /美通社/ -- 數(shù)字化轉(zhuǎn)型技術(shù)解決方案公司Trianz今天宣布,該公司與Amazon Web Services (AWS)簽訂了...

關(guān)鍵字: AWS AN BSP 數(shù)字化

倫敦2024年8月29日 /美通社/ -- 英國汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時1.5...

關(guān)鍵字: 汽車 人工智能 智能驅(qū)動 BSP

北京2024年8月28日 /美通社/ -- 越來越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運(yùn)行,同時企業(yè)卻面臨越來越多業(yè)務(wù)中斷的風(fēng)險,如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報道,騰訊和網(wǎng)易近期正在縮減他們對日本游戲市場的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國國際大數(shù)據(jù)產(chǎn)業(yè)博覽會開幕式在貴陽舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國國際大數(shù)據(jù)產(chǎn)業(yè)博覽會上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點: 有效應(yīng)對環(huán)境變化,經(jīng)營業(yè)績穩(wěn)中有升 落實提質(zhì)增效舉措,毛利潤率延續(xù)升勢 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競爭力 堅持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競爭優(yōu)勢...

關(guān)鍵字: 通信 BSP 電信運(yùn)營商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺與中國電影電視技術(shù)學(xué)會聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會上宣布正式成立。 活動現(xiàn)場 NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會上,軟通動力信息技術(shù)(集團(tuán))股份有限公司(以下簡稱"軟通動力")與長三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉