程磊,一線碼農(nóng),在某手機(jī)公司擔(dān)任系統(tǒng)開發(fā)工程師;閱碼場榮譽(yù)總編輯;日常喜歡研究內(nèi)核基本原理。
目錄:
一、概念解析
1.1 名詞解釋
1.2 線程同步與防同步
二、線程防同步方法
2.1 時間上防同步
2.2 空間上防同步
2.3 事后防同步
三、原子操作
3.1 int原子操作
3.2 long原子操作
3.3 bit原子操作
四、加鎖機(jī)制
4.1 鎖的底層原理
4.2 簡單自旋鎖
4.3 互斥鎖
4.4 信號量
五、per-CPU 變量
六、RCU 簡介
七、序列鎖
八、總結(jié)回顧
一、概念解析
1.1 名詞解釋
CPU: 本文中的CPU都是指邏輯CPU。
UP: 單處理器(單CPU)。
SMP: 對稱多處理器(多CPU)。
線程、執(zhí)行流: 線程的本質(zhì)是一個執(zhí)行流,但執(zhí)行流不僅僅有線程,還有ISR、softirq、tasklet,都是執(zhí)行流。本文中說到線程一般是泛指各種執(zhí)行流,除非是在需要區(qū)分不同執(zhí)行流時,線程才特指狹義的線程。
并發(fā)、并行: 并發(fā)是指線程在宏觀上表現(xiàn)為同時執(zhí)行,微觀上可能是同時執(zhí)行也可能是交錯執(zhí)行,并行是指線程在宏觀上是同時執(zhí)行,微觀上也是同時執(zhí)行。
偽并發(fā)、真并發(fā): 偽并發(fā)就是微觀上是交錯執(zhí)行的并發(fā),真并發(fā)就是并行。UP上只有偽并發(fā),SMP上既有偽并發(fā)也有真并發(fā)。
臨界區(qū): 訪問相同數(shù)據(jù)的代碼段,如果可能會在多個線程中并發(fā)執(zhí)行,就叫做臨界區(qū),臨界區(qū)可以是一個代碼段被多個線程并發(fā)執(zhí)行,也可以是多個不同的代碼段被多個線程并發(fā)執(zhí)行。
同步: 首先線程同步的同步和同步異步的同步,不是一個意思。線程同步的同步,本文按照字面意思進(jìn)行解釋,同步就是統(tǒng)一步調(diào)、同時執(zhí)行的意思。
線程同步現(xiàn)象: 線程并發(fā)過程中如果存在臨界區(qū)并發(fā)執(zhí)行的情況,就叫做線程同步現(xiàn)象。
線程防同步機(jī)制: 如果發(fā)生線程同步現(xiàn)象,由于臨界區(qū)會訪問共同的數(shù)據(jù),程序可能就會出錯,因此我們要防止發(fā)生線程同步現(xiàn)象,也就是防止臨界區(qū)并發(fā)執(zhí)行的情況,為此我們采取的防范措施叫做線程防同步機(jī)制。
1.2 線程同步與防同步
為什么線程同步叫線程同步,不叫線程防同步,叫線程同步給人的感覺好像就是要讓線程同時執(zhí)行的意思啊。但是實際上線程同步是讓臨界區(qū)不要并發(fā)執(zhí)行的意思,不管你們倆誰先執(zhí)行,只要錯開,誰先誰后執(zhí)行都可以。所以本文后面都采用線程防同步機(jī)制、防同步機(jī)制等詞。
我小時候一直有個疑惑,感冒藥為啥叫感冒藥,感冒藥是讓人感冒的藥啊,不是應(yīng)該叫治感冒藥才對嗎,治療感冒的藥。后來一想,就沒有讓人感冒的藥,所以感冒藥表達(dá)的就是治療感冒的藥,沒必要加個治字。但是同時還有一種藥,叫老鼠藥,是治療老鼠的藥嗎,不是啊,是要毒死老鼠的藥,因為沒有人會給老鼠治病。不過我們那里也有把老鼠藥叫做害老鼠藥的,加個害字,意思更明確,不會有歧義。
因此本文用了兩個詞,同步現(xiàn)象、防同步機(jī)制,使得概念更加清晰明確。
說了這么多就是為了闡述一個非常簡單的概念,就是不能同時操作相同的數(shù)據(jù),因為可能會出錯,所以我們要想辦法防止,這個方法我們把它叫做線程防同步。
還有一個詞是競態(tài)條件(race condition),很多關(guān)于線程同步的書籍文檔中都有提到,但是我一直沒有理解是啥意思。競態(tài)條件,條件,線程同步和條件也沒啥關(guān)系啊;競態(tài),也不知道是什么意思。再看它的英文,condition有情況的意思,race有賽跑、競爭的意思,是不是要表達(dá)賽跑情況、競爭現(xiàn)象,想說兩個線程在競爭賽跑,看誰能先訪問到公共數(shù)據(jù)。我發(fā)現(xiàn)沒有競態(tài)條件這個詞對我們理解線程同步問題一點影響都沒有,有了這個詞反而不明所以,所以我們就忽略這個詞。
二、線程防同步方法
在我們理解了同步現(xiàn)象、防同步機(jī)制這個詞后,下面的就很好理解了。同步現(xiàn)象是指同時訪問相同的數(shù)據(jù),那么如何防止呢,我不讓你同時訪問相同的數(shù)據(jù)不就可以了嘛。因此防同步機(jī)制有三大類方法。
2.1 時間上防同步
我不讓你們同時進(jìn)入臨界區(qū),這樣就不會同時操作相同的數(shù)據(jù)了,有三種方法:
1.原子操作
對于個別特別簡單特別短的臨界區(qū),CPU會提供一些原子指令,在一條指令中把多個操作完成,兩個原子指令必然一個在前一個在后地執(zhí)行,不可能同時執(zhí)行。原子指令能防偽并發(fā)和真并發(fā),適用于UP和SMP。
2.加鎖
對于大部分臨界區(qū)來說,代碼都比較復(fù)雜,CPU不可能都去實現(xiàn)原子指令,因此可以在臨界區(qū)的入口處加鎖,同一時間只能有一個線程進(jìn)入,獲得鎖的線程進(jìn)入,在臨界區(qū)的出口處再釋放鎖。未獲得鎖的線程在外面等待,等待的方式有兩種,忙等待的叫做自旋鎖,休眠等待的叫做阻塞鎖。根據(jù)臨界區(qū)內(nèi)的數(shù)據(jù)讀寫操作不同,鎖又可以分為單一鎖和讀寫鎖,單一鎖不區(qū)分讀者寫者,所有用戶都互斥;讀寫鎖區(qū)分讀者和寫者,讀者之間可以并行,寫者與讀者、寫者與寫者之間是互斥的。自旋鎖有單一鎖和讀寫鎖,阻塞鎖也有單一鎖和讀寫鎖。自旋鎖只能防真并發(fā),適用于SMP;休眠鎖能防偽并發(fā)和真并發(fā),適用于UP和SMP。
3.臨時禁用偽并發(fā)
對于某些由于偽并發(fā)而產(chǎn)生的同步問題,可以通過在臨界區(qū)的入口處禁用此類偽并發(fā)、在臨界區(qū)的出口處再恢復(fù)此類偽并發(fā)來解決。這種方式顯然只能防偽并發(fā),適用于UP和SMP上的單CPU。而自旋鎖只能防真并發(fā),所以在SMP上經(jīng)常會同時使用這兩種方法,同時防偽并發(fā)和真并發(fā)。
臨時禁用偽并發(fā)有三種情況:
a.禁用中斷
如果線程和中斷、軟中斷和中斷之間會訪問公共數(shù)據(jù),那么在前者的臨界區(qū)內(nèi)可以臨時禁用后者,也就是禁用中斷,達(dá)到防止偽并發(fā)的目的。在后者的臨界區(qū)內(nèi)不用采取措施,因為前者不能搶占后者,但是后者能搶占前者。前者一般會同時使用禁中斷和自旋鎖,禁中斷防止偽并發(fā),自旋鎖防止真并發(fā)。
b.禁用軟中斷
如果線程和軟中斷會訪問公共數(shù)據(jù),那么在前者的臨界區(qū)內(nèi)禁用后者,也就是禁用軟中斷,可以達(dá)到防止偽并發(fā)的目的。后者不用采取任何措施,因為前者不會搶占后者。前者也可以和自旋鎖并用,同時防止偽并發(fā)和真并發(fā)。
c.禁用搶占
如果線程和線程之間會訪問公共數(shù)據(jù),那么可以在臨界區(qū)內(nèi)禁用搶占,達(dá)到防止偽并發(fā)的目的。禁用搶占也可以和自旋鎖并用,同時防止偽并發(fā)和真并發(fā)。
2.2 空間上防同步
你們可以同時,但是我不讓你們訪問相同的數(shù)據(jù),有兩種方法:
1. 數(shù)據(jù)分割
把大家都要訪問的公共數(shù)據(jù)分割成N份,各訪問各的。這也有兩種情況:
a. 在SMP上如果多個CPU要經(jīng)常訪問一個全局?jǐn)?shù)據(jù),那么可以把這個數(shù)據(jù)拆分成NR_CPU份,每個CPU只訪問自己對應(yīng)的那份,這樣就不存在并發(fā)問題了,這個方法叫做 per-CPU 變量,只能防真并發(fā),適用于SMP,需要和禁用搶占配合使用。
b. TLS,每個線程都用自己的局部數(shù)據(jù),這樣就不存在并發(fā)問題了,能防真并發(fā)和偽并發(fā),適用于UP和SMP。
2. 數(shù)據(jù)復(fù)制
RCU,只適用于用指針訪問的動態(tài)數(shù)據(jù)。讀者復(fù)制指針,然后就可以隨意讀取數(shù)據(jù)了,所有的讀者可以共同讀一份數(shù)據(jù)。寫者復(fù)制數(shù)據(jù),然后就可以隨意修改復(fù)制后的數(shù)據(jù)了,因為這份數(shù)據(jù)是私有的,不過修改完數(shù)據(jù)之后要修改指針指向最新的數(shù)據(jù),修改指針的這個操作需要是原子的。對于讀者來說,它是復(fù)制完指針之后用自己的私有指針來訪問數(shù)據(jù)的,所以它訪問的要么是之前的數(shù)據(jù),要么是修改之后的數(shù)據(jù),而不會是不一致的數(shù)據(jù)。能防偽并發(fā)和真并發(fā),適用于UP和SMP。
2.3 事后防同步
不去積極預(yù)防并發(fā),而是假設(shè)不存在并發(fā),直接訪問數(shù)據(jù)。訪問完了之后再檢查剛才是否有并發(fā)發(fā)生,如果有就再重來一遍,一直重試,直到?jīng)]有并發(fā)發(fā)生為止。這就是內(nèi)核里面的序列鎖seqlock,能防偽并發(fā)和真并發(fā),適用于UP和SMP。
下面我們來畫張圖總結(jié)一下:
三、原子操作
3.1 int原子操作
硬件提供的都是直接對整數(shù)的原子操作,但是在Linux內(nèi)核里并不是直接對一個int類型進(jìn)行原子操作,而是對一個封裝后的數(shù)據(jù)進(jìn)行操作。本質(zhì)上還是對int進(jìn)行操作,那么為什么要做這么一層封裝呢?主要是為了接口語義,大家一看到一個變量是atomic_t類型的,大家立馬就明白這是一個原子變量,不能使用普通加減操作進(jìn)行運(yùn)算,要使用專門的接口函數(shù)來操作才對。
typedef struct { int counter;} atomic_t;
那么對于 atomic_t的原子操作都有哪些呢?
Atomic Integer Operation | Description |
---|---|
ATOMIC_INIT(int i) | At declaration, initialize to i |
int atomic_read(atomic_t *v) | Atomically read the integer value of v |
void atomic_set(atomic_t *v, int i) | Atomically set v equal to i. |
void atomic_add(int i, atomic_t *v) | Atomically add i to v |
void atomic_sub(int i, atomic_t *v) | Atomically subtract i from v |
void atomic_inc(atomic_t *v) | Atomically add one to v |
void atomic_dec(atomic_t *v) | Atomically subtract one from v |
int atomic_sub_and_test(int i, atomic_t *v) | Atomically subtract i from v and return true if the result is zero; otherwise false. |
int atomic_add_negative(int i, atomic_t *v) | Atomically add i to v and return true if the result is negative; otherwise false. |
int atomic_add_return(int i, atomic_t *v) | Atomically add i to v and return the result. |
int atomic_sub_return(int i, atomic_t *v) | Atomically subtract i from v and return the result. |
int atomic_dec_and_test(atomic_t *v) | Atomically decrement v by one and return true if zero; false otherwise |
int atomic_inc_and_test(atomic_t *v) | Atomically increment v by one and return true if the result is zero; false otherwise. |
3.2 long原子操作
如果上面的int類型(32位)不能滿足我們的原子操作需求,系統(tǒng)還為我們定義了64位的原子變量。
typedef struct { s64 counter;} atomic64_t;
同樣的也為我們提供了一堆原子接口:
Atomic Integer Operation | Description |
---|---|
ATOMIC64_INIT(int i) | At declaration, initialize to i |
int atomic64_read(atomic_t *v) | Atomically read the integer value of v |
void atomic64_set(atomic_t *v, int i) | Atomically set v equal to i. |
void atomic64_add(int i, atomic_t *v) | Atomically add i to v |
void atomic64_sub(int i, atomic_t *v) | Atomically subtract i from v |
void atomic64_inc(atomic_t *v) | Atomically add one to v |
void atomic64_dec(atomic_t *v) | Atomically subtract one from v |
int atomic64_sub_and_test(int i, atomic_t *v) | Atomically subtract i from v and return true if the result is zero; otherwise false. |
int atomic64_add_negative(int i, atomic_t *v) | Atomically add i to v and return true if the result is negative; otherwise false. |
int atomic64_add_return(int i, atomic_t *v) | Atomically add i to v and return the result. |
int atomic64_sub_return(int i, atomic_t *v) | Atomically subtract i from v and return the result. |
int atomic64_dec_and_test(atomic_t *v) | Atomically decrement v by one and return true if zero; false otherwise |
int atomic64_inc_and_test(atomic_t *v) | Atomically increment v by one and return true if the result is zero; false otherwise. |
3.3 bit原子操作
系統(tǒng)還給我們提供了位運(yùn)算的原子操作,不過并沒有封裝數(shù)據(jù)類型,而是操作一個void * 指針?biāo)赶虻臄?shù)據(jù),我們要操作的位數(shù)要在我們希望的數(shù)據(jù)之內(nèi),這點是由我們自己來保證的。
Atomic Bitwise Operation | Description |
---|---|
void set_bit(int nr, void *addr) | Atomically set the nr-th bit starting from addr. |
void clear_bit(int nr, void *addr) | Atomically clear the nr-th bit starting from addr. |
void change_bit(int nr, void *addr) | Atomically flip the value of the nr-th bit starting from addr. |
int test_and_set_bit(int nr, void *addr) | Atomically set the nr-th bit starting from addr and return the previous value. |
int test_and_clear_bit(int nr, void *addr) | Atomically clear the nr-th bit starting from addr and return the previous value. |
int test_and_change_bit(int nr, void *addr) | Atomically flip the nr-th bit starting from addr and return the previous value. |
int test_bit(int nr, void *addr) | Atomically return the value of the nrth bit starting from addr. |
有了原子操作,我們對這些簡單的基本運(yùn)算就不用使用加鎖機(jī)制了,就可以提高效率。
四、加鎖機(jī)制
4.1 鎖的底層原理
我們該如何實現(xiàn)一個鎖呢,下面我們嘗試直接用軟件來實現(xiàn)試試,代碼如下:
int lock = 0; void lock(int * lock){start: if(*lock == 0) *lock = 1; else{ wait(); goto start; }} void unlock(int * lock){ *lock = 0; wakeup();}
可以看到這個鎖的實現(xiàn)邏輯很簡單,定義一個整數(shù)作為鎖,0代表沒人持鎖,1代表有人持鎖。我們先判斷鎖的值,如果是0代表沒人持鎖,我們給鎖賦值1,代表我們獲得了鎖,然后函數(shù)返回,就可以進(jìn)入臨界區(qū)了。如果鎖是1,代表有人已經(jīng)持有了鎖,此時我們就需要等待,等待函數(shù)wait,可以用忙等待,也可以用休眠等待。釋放鎖的時候把鎖設(shè)為0,然后wakeup其他線程或者為空操作。被喚醒的線程從wait中醒來,然后又重走加鎖流程。但是這里面有個問題,就是加鎖操作也是個臨界區(qū),如果兩個線程在兩個CPU上同時執(zhí)行到加鎖函數(shù),雙方都檢測到鎖是0,然后都把鎖置為1,都加鎖成功,這不是出問題了嗎。鎖就是用來保護(hù)臨界區(qū)的,但是加鎖本身也是臨界區(qū),也需要保護(hù),該怎么辦呢?唯一的方法就是求助于硬件,讓加鎖操作成為原子操作,X86平臺提供了CAS指令來實現(xiàn)這個功能。CAS,Compare and Swap,比較并交換,它的接口邏輯是這樣的:
-
int cas(int * p, old_value, new_value)
如果p位置的值等于old_value,就把p位置的值設(shè)置為new_value,并返回1,否則返回0。關(guān)鍵就在于cas它是硬件實現(xiàn)的,是原子的。這樣我們就可以用cas指令來實現(xiàn)鎖的邏輯,如下所示:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
int lock = 0; void lock(int * lock){start: if(cas(lock, 0, 1)) return; else{ wait(); goto start; }} void unlock(int * lock){ *lock = 0; wakeup();}
4.2 簡單自旋鎖
為什么在這里要先講簡單自旋鎖呢?因為互斥鎖、信號量這些鎖的內(nèi)部實現(xiàn)都用了自旋鎖,所以先把自旋鎖的邏輯講清楚才能繼續(xù)講下去。為什么是簡單自旋鎖呢,因為自旋鎖現(xiàn)在已經(jīng)發(fā)展得很復(fù)雜了,所以這里就是講一下自旋鎖的簡單版本,因為它們的基本邏輯是一致的。自旋鎖由于在加鎖失敗時是忙等待,所以不用考慮等待隊列、睡眠喚醒的問題,所以實現(xiàn)比較簡單,下面是簡單自旋鎖的實現(xiàn)代碼:
-
-
-
-
-
-
-
-
-
int lock = 0; void spin_lock(int * lock){ while(!cas(lock, 0, 1))} void spin_unlock(int * lock){ *lock = 0;}
可以看到簡單自旋鎖的代碼是相當(dāng)簡單,加鎖的時候不停地嘗試加鎖,一直失敗一直加,直到成功才返回。釋放鎖的時候更簡單,直接把鎖置為0就可以了。此時如果有其它自旋鎖在自旋,由于鎖已經(jīng)變成了0,所以就會加鎖成功,結(jié)束自旋。注意這個代碼只是自旋鎖的邏輯演示,并不是真正的自旋鎖實現(xiàn)。
4.3 互斥鎖
互斥鎖是休眠鎖,加鎖失敗時要把自己放入等待隊列,釋放鎖的時候要考慮喚醒等待隊列的線程?;コ怄i的定義如下(刪除了一些配置選項):
-
-
-
-
-
struct mutex { atomic_long_t owner; raw_spinlock_t wait_lock; struct list_head wait_list;};
可以看到互斥鎖的定義有atomic_long_t owner,這個和我們之前定義的int lock是相似的,只不過這里是個加強(qiáng)版,0代表沒加鎖,加鎖的時候是非0,而不是簡單的1,而是記錄的是加鎖的線程。然后是自旋鎖和等待隊列,自旋鎖是用來保護(hù)等待隊列的。這里的自旋鎖為什么要用raw_spinlock_t呢,它和spinlock_t有什么區(qū)別呢?在標(biāo)準(zhǔn)的內(nèi)核版本下是沒有區(qū)別的,如果打了RTLinux補(bǔ)丁之后它們就不一樣了,spinlock_t會轉(zhuǎn)化為阻塞鎖,raw_spinlock_t還是自旋鎖,如果需要在任何情況下都要自旋的話請使用raw_spinlock_t。下面我們看一下它的加鎖操作:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
void __sched mutex_lock(struct mutex *lock){ might_sleep(); if (!__mutex_trylock_fast(lock)) __mutex_lock_slowpath(lock);}static __always_inline bool __mutex_trylock_fast(struct mutex *lock){ unsigned long curr = (unsigned long)current; unsigned long zero = 0UL; if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr)) return true; return false;}
可以看到加鎖時先嘗試直接加鎖,用的就是CAS原子指令(x86的CAS指令叫做cmpxchg)。如果owner是0,代表當(dāng)前鎖是開著的,就把owner設(shè)置為自己(也就是當(dāng)前線程,struct task_struct * 強(qiáng)轉(zhuǎn)為 ulong),代表自己成為這個鎖的主人,也就是自己加鎖成功了,然后返回true。如果owner不為0的話,代表有人已經(jīng)持有鎖了,返回false,后面就要走慢速路徑了,也就是把自己放入等待隊列里休眠等待。下面看看慢速路徑的代碼是怎么實現(xiàn)的:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
static noinline void __sched__mutex_lock_slowpath(struct mutex *lock){ __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);} static int __sched__mutex_lock(struct mutex *lock, unsigned int state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip){ return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);} static __always_inline int __sched__mutex_lock_common(struct mutex *lock, unsigned int state, unsigned int subclass, struct lockdep_map *nest_lock, unsigned long ip, struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx){ struct mutex_waiter waiter; int ret; raw_spin_lock(&lock->wait_lock); waiter.task = current; __mutex_add_waiter(lock, &waiter, &lock->wait_list); set_current_state(state); for (;;) { bool first; if (__mutex_trylock(lock)) goto acquired; raw_spin_unlock(&lock->wait_lock); schedule_preempt_disabled(); first = __mutex_waiter_is_first(lock, &waiter); set_current_state(state); if (__mutex_trylock_or_handoff(lock, first)) break; raw_spin_lock(&lock->wait_lock); } acquired: __set_current_state(TASK_RUNNING); __mutex_remove_waiter(lock, &waiter); raw_spin_unlock(&lock->wait_lock); return ret;} static void__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter, struct list_head *list){ debug_mutex_add_waiter(lock, waiter, current); list_add_tail(&waiter->list, list); if (__mutex_waiter_is_first(lock, waiter)) __mutex_set_flag(lock, MUTEX_FLAG_WAITERS);}
可以看到慢速路徑最終會調(diào)用函數(shù)__mutex_lock_common,這個函數(shù)本身是很復(fù)雜的,這里進(jìn)行了大量的刪減,只保留了核心邏輯。函數(shù)先加鎖mutex的自旋鎖,然后把自己放到等待隊列上去,然后就在無限for循環(huán)中休眠并等待別人釋放鎖并喚醒自己。For循環(huán)的入口處先嘗試加鎖,如果成功就goto acquired,如果不成功就釋放自旋鎖,并調(diào)用schedule_preempt_disabled,此函數(shù)就是休眠函數(shù),它會調(diào)度其它進(jìn)程來執(zhí)行,自己就休眠了,直到有人喚醒自己才會醒來繼續(xù)執(zhí)行。別人釋放鎖的時候會喚醒自己,這個我們后面會分析。當(dāng)我們被喚醒之后會去嘗試加鎖,如果加鎖失敗,再次來到for循環(huán)的開頭處,再試一次加鎖,如果不行就再走一次休眠過程。為什么我們加鎖會失敗呢,因為有可能多個線程同時被喚醒來爭奪鎖,我們不一定會搶鎖成功。搶鎖失敗就再去休眠,最后總會搶鎖成功的。
把自己加入等待隊列時會設(shè)置flag MUTEX_FLAG_WAITERS,這個flag是設(shè)置在owner的低位上去,因為task_struct指針至少是L1_CACHE_BYTES對齊的,所以最少有3位可以空出來做flag。
下面我們再來看一下釋放鎖的流程:
-
-
-
-
-
-
-
-
-
-
-
-
void __sched mutex_unlock(struct mutex *lock){ if (__mutex_unlock_fast(lock)) return; __mutex_unlock_slowpath(lock, _RET_IP_);} static __always_inline bool __mutex_unlock_fast(struct mutex *lock){ unsigned long curr = (unsigned long)current; return atomic_long_try_cmpxchg_release(&lock->owner, &curr, 0UL);}
解鎖的時候先嘗試快速解鎖,快速解鎖的意思是沒有其它在等待隊列里,可以直接釋放鎖。怎么判斷等待隊列里沒有線程在等待呢,這就是前面設(shè)置的MUTEX_FLAG_WAITERS的作用了。如果設(shè)置了這個flag,lock->owner 和 curr就不會相等,直接釋放鎖就會失敗,就要走慢速路徑。慢速路徑的代碼如下:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip){ struct task_struct *next = NULL; DEFINE_WAKE_Q(wake_q); unsigned long owner; owner = atomic_long_read(&lock->owner); for (;;) { if (atomic_long_try_cmpxchg_release(&lock->owner, &owner, __owner_flags(owner))) { if (owner & MUTEX_FLAG_WAITERS) break; } } raw_spin_lock(&lock->wait_lock); if (!list_empty(&lock->wait_list)) { struct mutex_waiter *waiter = list_first_entry(&lock->wait_list, struct mutex_waiter, list); next = waiter->task; wake_q_add(&wake_q, next); } raw_spin_unlock(&lock->wait_lock); wake_up_q(&wake_q);}
上述代碼進(jìn)行了一些刪減??梢钥闯錾鲜龃a會先釋放鎖,然后喚醒等待隊列里面的第一個等待者。
4.4 信號量
信號量與互斥鎖有很大的不同,互斥鎖代表只有一個線程能同時進(jìn)入臨界區(qū),而信號量是個整數(shù)計數(shù),代表著某一類資源有多少個,能同時讓多少個線程訪問這類資源。信號量也沒有加鎖解鎖操作,信號量類似的操作叫做down和up,down代表獲取一個資源,up代表歸還一個資源。
下面我們先看一下信號量的定義:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
struct semaphore { raw_spinlock_t lock; unsigned int count; struct list_head wait_list;}; #define __SEMAPHORE_INITIALIZER(name, n) \{ \ .lock = __RAW_SPIN_LOCK_UNLOCKED((name).lock), \ .count = n, \ .wait_list = LIST_HEAD_INIT((name).wait_list), \} static inline void sema_init(struct semaphore *sem, int val){ *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);}
可以看出信號量和互斥鎖的定義很相似,都有一個自旋鎖,一個等待隊列,不同的是信號量沒有owner,取而代之的是count,代表著某一類資源的個數(shù),而且自旋鎖同時保護(hù)著等待隊列和count。信號量初始化時要指定count的大小。
我們來看一下信號量的down操作(獲取一個資源):
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
void down(struct semaphore *sem){ unsigned long flags; might_sleep(); raw_spin_lock_irqsave(&sem->lock, flags); if (likely(sem->count > 0)) sem->count--; else __down(sem); raw_spin_unlock_irqrestore(&sem->lock, flags);} static noinline void __sched __down(struct semaphore *sem){ __down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);} static inline int __sched __down_common(struct semaphore *sem, long state, long timeout){ struct semaphore_waiter waiter; list_add_tail(&waiter.list, &sem->wait_list); waiter.task = current; waiter.up = false; for (;;) { if (signal_pending_state(state, current)) goto interrupted; if (unlikely(timeout <= 0)) goto timed_out; __set_current_state(state); raw_spin_unlock_irq(&sem->lock); timeout = schedule_timeout(timeout); raw_spin_lock_irq(&sem->lock); if (waiter.up) return 0; } timed_out: list_del(&waiter.list); return -ETIME; interrupted: list_del(&waiter.list); return -EINTR;}
可以看出我們會先持有自旋鎖,然后看看count是不是大于0,大于0的話代表資源還有剩余,我們直接減1,代表占用一份資源,就可以返回了。如果不大于0的話,代表資源沒有了,我們就進(jìn)去等待隊列等待。
我們再來看看up操作(歸還資源):
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
void up(struct semaphore *sem){ unsigned long flags; raw_spin_lock_irqsave(&sem->lock, flags); if (likely(list_empty(&sem->wait_list))) sem->count++; else __up(sem); raw_spin_unlock_irqrestore(&sem->lock, flags);} static noinline void __sched __up(struct semaphore *sem){ struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); waiter->up = true; wake_up_process(waiter->task);}
先加鎖自旋鎖,然后看看等待隊列是否為空,如果為空的話直接把count加1就可以了。如果不為空的話,則代表有人在等待資源,資源就不加1了,直接喚醒隊首的線程來獲取。
五、per-CPU變量
前面講的原子操作和加鎖機(jī)制都是從時間上防同步的,現(xiàn)在我們開始講空間上防同步,先來講講per-CPU變量。如果我們要操作的數(shù)據(jù)和當(dāng)前CPU是密切相關(guān)的,不同的CPU可以操作不同的數(shù)據(jù),那么我們就可以把這個變量定義為per-CPU變量,每個CPU就可以各訪問各的,互不影響了。這個方法可以防止多個CPU之間的真并發(fā),但是同一個CPU上如果有偽并發(fā),還是會出問題,所以還需要禁用偽并發(fā)。per-CPU變量的定義和使用方法如下表所示:
Macro or function name
Description
DEFINE_PER_CPU(type, name)
Statically allocates a per-CPU array called name of type data structures
per_cpu(name, cpu)
Selects the element for CPU cpu of the per-CPU array name
_ _get_cpu_var(name)
Selects the local CPU’s element of the per-CPU array name
get_cpu_var(name)
Disables kernel preemption, then selects the local CPU’s element of the per-CPU array name
put_cpu_var(name)
Enables kernel preemption (name is not used)
alloc_percpu(type)
Dynamically allocates a per-CPU array of type data structures and returns its address
free_percpu(pointer)
Releases a dynamically allocated per-CPU array at address pointer
per_cpu_ptr(pointer, cpu)
Returns the address of the element for CPU cpu of the per-CPU array at address pointer
六、RCU簡介
RCU是一種非常巧妙的空間防同步方法。首先它只能用于用指針訪問的動態(tài)數(shù)據(jù)。其次它采取讀者和寫者分開的方法,讀者讀取數(shù)據(jù)要先復(fù)制指針,用這個復(fù)制的指針來訪問數(shù)據(jù),這個數(shù)據(jù)是只讀的,不會被修改,很多讀者可以同時來訪問。寫者并不去直接更改數(shù)據(jù),而是先申請一塊內(nèi)存空間,把數(shù)據(jù)都復(fù)制過來,在這個復(fù)制的數(shù)據(jù)上修改數(shù)據(jù),由于這塊數(shù)據(jù)是私有的,所以可以隨意修改,也不用加鎖。修改完了之后,寫者要原子的修改指針的值,讓它指向自己新完成的數(shù)據(jù)。這對于讀者來說是沒有影響的,因為讀者已經(jīng)復(fù)制了指針,所以讀者讀的還是原來的數(shù)據(jù)沒有變,新來的讀者會復(fù)制新的指針,訪問新的數(shù)據(jù),讀者訪問的一直都是一致性的數(shù)據(jù),不會訪問到修改一半的數(shù)據(jù)。RCU的難點在于,原有的數(shù)據(jù)怎么回收,當(dāng)寫者更新指針之后,原先的數(shù)據(jù)就過期了,當(dāng)所有老的讀者都離開臨界區(qū)之后,這個數(shù)據(jù)的內(nèi)存需要被釋放,寫者需要判斷啥時候老的讀者全都離開臨界區(qū)了,才能去釋放老的數(shù)據(jù)。
七、序列鎖
除了前面講的時間防同步、空間防同步,Linux還有一種非常巧妙的防同步方法,那就是不妨了事后再補(bǔ)救,我第一次看到這個方法的時候,真是拍案叫絕,驚嘆不已,還能這么做。這種方法叫做序列鎖,它的思想就好比是,我家里也不鎖門了,小偷偷就偷吧,偷了我就再報警把東西找回來。反正小偷又不是天天來我家偷東西,小偷來的次數(shù)非常少,我天天鎖門太費(fèi)勁了。我每天早上看一下啥東西丟了沒,丟了就報警把東西找回來。當(dāng)然這個類比并不完全像,只是大概邏輯比較像,下面我們就來講一講序列鎖的做法。序列鎖區(qū)分讀者和寫者,讀者可以并行,寫者還是需要互斥的,讀者和寫者之間也可以并行,所以當(dāng)讀者很頻繁,寫者很偶發(fā)的時候就適合用序列鎖。這個鎖有一個序列號,初始值是0,寫者進(jìn)入臨界區(qū)時把這個序列號加1,退出時再加1,讀者讀之前先獲取這個鎖的序列號,如果是奇數(shù)說明有寫者在臨界區(qū),就不停地獲取序列號,直到序列號為偶數(shù)為止。然后讀者進(jìn)入臨界區(qū)進(jìn)行讀操作,然后退出臨界區(qū)的時候再讀取一下序列號。如果和剛才獲取的序列號不一樣,說明有寫者剛才進(jìn)來過,再重新走一遍剛才的操作。如果序列號還不一樣就一直重復(fù),直到序列號一樣為止。
下面我們先來看看序列鎖的定義:
-
-
-
-
-
-
-
-
typedef struct seqcount { unsigned sequence;} seqcount_t; typedef struct { seqcount_spinlock_t seqcount; spinlock_t lock;} seqlock_t;
seqlock_t包含一個自旋鎖和一個seqcount_spinlock_t,seqcount_spinlock_t經(jīng)過一些復(fù)雜的宏定義包含了seqcount_t,所以可以簡單地認(rèn)為seqlock_t包含一個自旋鎖和一個int序列號。
下面我們看一下寫者的操作:
-
-
pid_t fork(void);
寫者的操作很簡單,就是用自旋鎖實現(xiàn)互斥,然后加鎖解釋的時候都把序列號增加1。
下面看讀者的操作:
-
-
-
-
-
-
-
-
-
-
-
static inline void write_seqlock(seqlock_t *sl){ spin_lock(&sl->lock); do_write_seqcount_begin(&sl->seqcount.seqcount);} static inline void write_sequnlock(seqlock_t *sl){ do_write_seqcount_end(&sl->seqcount.seqcount); spin_unlock(&sl->lock);}
讀者進(jìn)入臨界區(qū)之前先通過read_seqbegin獲取一個序列號,在臨界區(qū)的時候調(diào)用read_seqretry看看是否需要重走一遍臨界區(qū)。我們下面看一下內(nèi)核里使用序列鎖的一個例子:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
static inline unsigned read_seqbegin(const seqlock_t *sl){ unsigned ret = read_seqcount_begin(&sl->seqcount); kcsan_atomic_next(0); /* non-raw usage, assume closing read_seqretry() */ kcsan_flat_atomic_begin(); return ret;} static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start){ /* * Assume not nested: read_seqretry() may be called multiple times when * completing read critical section. */ kcsan_flat_atomic_end(); return read_seqcount_retry(&sl->seqcount, start);}
可以看到寫者使用序列鎖和正常的使用方法是一樣的,讀者使用序列鎖一般都是配合do while循環(huán)一起使用。
通過本文,我們明白了線程同步的本質(zhì),了解了線程防同步的基本邏輯和具體方法。防同步就是防止多個執(zhí)行流同時訪問相同的數(shù)據(jù),所以我們可以從兩點來防,一個是同時(時間上防同步),一個是相同的數(shù)據(jù)(空間上防同步)。時間上防同步我們采取的方法有原子操作,通過硬件來防止同時,加鎖機(jī)制,軟件方法來防同時,還有禁用偽并發(fā),防止宏觀上的同時微觀上的交錯。空間防同步我們采取的方法有數(shù)據(jù)分割、per CPU變量,每個CPU值訪問自己對應(yīng)的數(shù)據(jù)。數(shù)據(jù)復(fù)制,RCU,讀的時候復(fù)制指針,讀的數(shù)據(jù)是不變的,寫的時候不直接改變數(shù)據(jù),而是先把數(shù)據(jù)復(fù)制過來,修改自己的私有副本,這樣就不會有同步的問題,然后再原子地更新指針指向最新的數(shù)據(jù)。
八、總結(jié)回顧
通過本文,我們明白了線程同步的本質(zhì),了解了線程防同步的基本邏輯和具體方法。防同步就是防止多個執(zhí)行流同時訪問相同的數(shù)據(jù),所以我們可以從兩點來防,一個是同時(時間上防同步),一個是相同的數(shù)據(jù)(空間上防同步)。時間上防同步我們采取的方法有原子操作,通過硬件來防止同時,加鎖機(jī)制,軟件方法來防同時,還有禁用偽并發(fā),防止宏觀上的同時微觀上的交錯??臻g防同步我們采取的方法有數(shù)據(jù)分割、per CPU變量,每個CPU值訪問自己對應(yīng)的數(shù)據(jù)。數(shù)據(jù)復(fù)制,RCU,讀的時候復(fù)制指針,讀的數(shù)據(jù)是不變的,寫的時候不直接改變數(shù)據(jù),而是先把數(shù)據(jù)復(fù)制過來,修改自己的私有副本,這樣就不會有同步的問題,然后再原子地更新指針指向最新的數(shù)據(jù)。
參考文獻(xiàn):
《Linux Kernel Development》
《Understanding the Linux Kernel》
《Professional Linux Kernel Architecture》
https://lwn.net/Kernel/Index/#Locking_mechanisms
http://www.wowotech.net/sort/kernel_synchronization
長按掃碼,備注“程磊”
小馬邀請您進(jìn)程磊老師交流群