Linux C 多線程編程之互斥鎖與條件變量實例詳解
一、互斥鎖
互斥量從本質(zhì)上說就是一把鎖, 提供對共享資源的保護(hù)訪問。
1. 初始化:
在Linux下, 線程的互斥量數(shù)據(jù)類型是pthread_mutex_t. 在使用前, 要對它進(jìn)行初始化:
對于靜態(tài)分配的互斥量, 可以把它設(shè)置為PTHREAD_MUTEX_INITIALIZER, 或者調(diào)用pthread_mutex_init.
對于動態(tài)分配的互斥量, 在申請內(nèi)存(malloc)之后, 通過pthread_mutex_init進(jìn)行初始化, 并且在釋放內(nèi)存(free)前需要調(diào)用pthread_mutex_destroy.
原型:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restric attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
頭文件:
返回值: 成功則返回0, 出錯則返回錯誤編號.
說明: 如果使用默認(rèn)的屬性初始化互斥量, 只需把a(bǔ)ttr設(shè)為NULL. 其他值在以后講解。
2. 互斥操作:
對共享資源的訪問, 要對互斥量進(jìn)行加鎖, 如果互斥量已經(jīng)上了鎖, 調(diào)用線程會阻塞, 直到互斥量被解鎖. 在完成了對共享資源的訪問后, 要對互斥量進(jìn)行解鎖。
首先說一下加鎖函數(shù):
頭文件:
原型:
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
返回值: 成功則返回0, 出錯則返回錯誤編號.
說明: 具體說一下trylock函數(shù), 這個函數(shù)是非阻塞調(diào)用模式, 也就是說, 如果互斥量沒被鎖住, trylock函數(shù)將把互斥量加鎖, 并獲得對共享資源的訪問權(quán)限; 如果互斥量被鎖住了, trylock函數(shù)將不會阻塞等待而直接返回EBUSY, 表示共享資源處于忙狀態(tài)。
再說一下解所函數(shù):
頭文件:
原型:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值: 成功則返回0, 出錯則返回錯誤編號.
3. 死鎖:
死鎖主要發(fā)生在有多個依賴鎖存在時, 會在一個線程試圖以與另一個線程相反順序鎖住互斥量時發(fā)生. 如何避免死鎖是使用互斥量應(yīng)該格外注意的東西。
總體來講, 有幾個不成文的基本原則:
對共享資源操作前一定要獲得鎖。
完成操作以后一定要釋放鎖。
盡量短時間地占用鎖。
如果有多鎖, 如獲得順序是ABC連環(huán)扣, 釋放順序也應(yīng)該是ABC。
線程錯誤返回時應(yīng)該釋放它所獲得的鎖。
下面給個測試小程序進(jìn)一步了解互斥,mutex互斥信號量鎖住的不是一個變量,而是阻塞住一段程序。如果對一個mutex變量testlock, 執(zhí)行了第一次pthread_mutex_lock(testlock)之后,在unlock(testlock)之前的這段時間內(nèi),如果有其他線程也執(zhí)行到了pthread_mutex_lock(testlock),這個線程就會阻塞住,直到之前的線程unlock之后才能執(zhí)行,由此,實現(xiàn)同步,也就達(dá)到保護(hù)臨界區(qū)資源的目的。
#
include
#include
static pthread_mutex_t testlock;
pthread_t test_thread;
void *test()
{
pthread_mutex_lock(&testlock);
printf("thread Test() \n");
pthread_mutex_unlock(&testlock);
}
int main()
{
pthread_mutex_init(&testlock, NULL);
pthread_mutex_lock(&testlock);
printf("Main lock \n");
pthread_create(&test_thread, NULL, test, NULL);
sleep(1); //更加明顯的觀察到是否執(zhí)行了創(chuàng)建線程的互斥鎖
printf("Main unlock \n");
pthread_mutex_unlock(&testlock);
sleep(1);
pthread_join(test_thread,NULL);
pthread_mutex_destroy(&testlock);
return 0;
}
make
gcc -D_REENTRANT -lpthread -o test test.c
結(jié)果:
Main lock
Main unlock
thread Test()
二、條件變量
這里主要說說 pthread_cond_wait()的用法,在下面有說明。
條件變量是利用線程間共享的全局變量進(jìn)行同步的一種機(jī)制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;另一個線程使"條件成立"(給出條件成立信號)。為了防止競爭,條件變量的使用總是和一個互斥鎖結(jié)合在一起。
1. 創(chuàng)建和注銷
條件變量和互斥鎖一樣,都有靜態(tài)動態(tài)兩種創(chuàng)建方式,靜態(tài)方式使用PTHREAD_COND_INITIALIZER常量,如下:
pthread_cond_t cond=PTHREAD_COND_INITIALIZER
動態(tài)方式調(diào)用pthread_cond_init()函數(shù),API定義如下:
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr)
盡管POSIX標(biāo)準(zhǔn)中為條件變量定義了屬性,但在LinuxThreads中沒有實現(xiàn),因此cond_attr值通常為NULL,且被忽略。
注銷一個條件變量需要調(diào)用pthread_cond_destroy(),只有在沒有線程在該條件變量上等待的時候才能注銷這個條件變量,否則返回EBUSY。因為Linux實現(xiàn)的條件變量沒有分配什么資源,所以注銷動作只包括檢查是否有等待線程。API定義如下:
int pthread_cond_destroy(pthread_cond_t *cond)
2. 等待和激發(fā)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
等待條件有兩種方式:無條件等待pthread_cond_wait()和計時等待pthread_cond_timedwait(),其中計時等待方式如果在給定時刻前條件沒有滿足,則返回ETIMEOUT,結(jié)束等待,其中abstime以與time()系統(tǒng)調(diào)用相同意義的絕對時間形式出現(xiàn),0表示格林尼治時間1970年1月1日0時0分0秒。
無論哪種等待方式,都必須和一個互斥鎖配合,以防止多個線程同時請求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的競爭條件(Race Condition)。mutex互斥鎖必須是普通鎖(PTHREAD_MUTEX_TIMED_NP)或者適應(yīng)鎖(PTHREAD_MUTEX_ADAPTIVE_NP),且在調(diào)用pthread_cond_wait()前必須由本線程加鎖(pthread_mutex_lock()),而在更新條件等待隊列以前,mutex保持鎖定狀態(tài),并在線程掛起進(jìn)入等待前解鎖。在條件滿足從而離開pthread_cond_wait()之前,mutex將被重新加鎖,以與進(jìn)入pthread_cond_wait()前的加鎖動作對應(yīng)。 執(zhí)行pthread_cond_wait()時自動解鎖互斥量(如同執(zhí)行了 pthread_unlock_mutex),并等待條件變量觸發(fā)。這時線程掛起,不占用 CPU 時間,直到條件變量被觸發(fā)。
因此,全過程可以描述為:
(1)pthread_mutex_lock()上鎖,
(2)pthread_cond_wait()等待,等待過程分解為為:解鎖--條件滿足--加鎖
(3)pthread_mutex_unlock()解鎖。
激發(fā)條件有兩種形式,pthread_cond_signal()激活一個等待該條件的線程,存在多個等待線程時按入隊順序激活其中一個;而pthread_cond_broadcast()則激活所有等待線程。 兩者 如果沒有等待的線程,則什么也不做。
下面一位童鞋問的問題解釋了上面的說明:
當(dāng)pthread_cond_t調(diào)用pthread_cond_wait進(jìn)入等待狀態(tài)時,pthread_mutex_t互斥信號無效了.
示例代碼如下:
//多線程同步--條件鎖(相當(dāng)與windows的事件)測試
//要先讓pthread_cond_wait進(jìn)入等待信號狀態(tài),才能調(diào)用pthread_cond_signal發(fā)送信號,才有效.
//不能讓pthread_cond_signal在pthread_cond_wait前面執(zhí)行
#include
#include
#include
pthread_cond_t g_cond /*=PTHREAD_MUTEX_INITIALIZER*/; //申明條鎖,并用宏進(jìn)行初始化
pthread_mutex_t g_mutex ;
//線程執(zhí)行函數(shù)
void threadFun1(void)
{
int i;
pthread_mutex_lock(&g_mutex); //1
pthread_cond_wait(&g_cond,&g_mutex); //如g_cond無信號,則阻塞
for( i = 0;i < 2; i++ ){
printf("thread threadFun1.\n");
sleep(1);
}
pthread_cond_signal(&g_cond);
pthread_mutex_unlock(&g_mutex);
}
int main(void)
{
pthread_t id1; //線程的標(biāo)識符
pthread_t id2;
pthread_cond_init(&g_cond,NULL); //也可以程序里面初始化
pthread_mutex_init(&g_mutex,NULL); //互斥變量初始化
int i,ret;
ret = pthread_create(&id1,NULL,(void *)threadFun1, NULL);
if ( ret!=0 ) { //不為0說明線程創(chuàng)建失敗
printf ("Create pthread1 error!\n");
exit (1);
}
sleep(5); //等待子線程先開始
pthread_mutex_lock(&g_mutex); //2
pthread_cond_signal(&g_cond); //給個開始信號,注意這里要先等子線程進(jìn)入等待狀態(tài)在發(fā)信號,否則無效
pthread_mutex_unlock(&g_mutex);
pthread_join(id1,NULL);
pthread_cond_destroy(&g_cond); //釋放
pthread_mutex_destroy(&g_mutex); //釋放
return 0;
}
大家請看紅顏色的1和2.
明明是1先鎖了互斥變量,但代碼執(zhí)行到2還是一樣可以鎖定.
為什么會這樣呢????/
pthread_cond_wait()什么情況才會接鎖,繼續(xù)跑下去啊...現(xiàn)在來看一段典型的應(yīng)用:看注釋即可。
問題解釋:當(dāng)程序進(jìn)入pthread_cond_wait等待后,將會把g_mutex進(jìn)行解鎖,當(dāng)離開pthread_cond_wait之前,g_mutex會重新加鎖。所以在main中的g_mutex會被加鎖。 呵呵。。。
現(xiàn)在來看一段典型的應(yīng)用:看注釋即可。
#include
#include
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct node {
int n_number;
struct node *n_next;
} *head = NULL;
/*[thread_func]*/
static void cleanup_handler(void *arg)
{
printf("Cleanup handler of second thread.\n");
free(arg);
(void)pthread_mutex_unlock(&mtx);
}
static void *thread_func(void *arg)
{
struct node *p = NULL;
pthread_cleanup_push(cleanup_handler, p);
while (1) {
pthread_mutex_lock(&mtx); //這個mutex主要是用來保證pthread_cond_wait的并發(fā)性
while (head == NULL) { //這個while要特別說明一下,單個pthread_cond_wait功能很完善,為何這里要有一個while (head == NULL)呢?因為pthread_cond_wait里的線程可能會被意外喚醒,如果這個時候head != NULL,則不是我們想要的情況。這個時候,應(yīng)該讓線程繼續(xù)進(jìn)入pthread_cond_wait
pthread_cond_wait(&cond, &mtx); // pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx,然后阻塞在等待對列里休眠,直到再次被喚醒(大多數(shù)情況下是等待的條件成立而被喚醒,喚醒后,該進(jìn)程會先鎖定先pthread_mutex_lock(&mtx);,再讀取資源, 用這個流程是比較清楚的/*block-->unlock-->wait() return-->lock*/
#include
#include
#include
#include
#include
#include
#include
#define min(a,b) ((a) < (b) ? (a) : (b))
#define max(a,b) ((a) > (b) ? (a) : (b))
#define MAXNITEMS 1000000
#define MAXNTHREADS 100
int nitems; /* read-only by producer and consumer */
struct {
pthread_mutex_t mutex;
int buff[MAXNITEMS];
int nput;
int nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };
void *produce(void *), *consume(void *);
/* include main */
int
main(int argc, char **argv)
{
int i, nthreads, count[MAXNTHREADS];
pthread_t tid_produce[MAXNTHREADS], tid_consume;
if (argc != 3) {
printf("usage: prodcons3 <#items> <#threads>\n");
return -1;
}
nitems = min(atoi(argv[1]), MAXNITEMS);
nthreads = min(atoi(argv[2]), MAXNTHREADS);
/* 4create all producers and one consumer */
for (i = 0; i < nthreads; i++) {
count[i] = 0;
pthread_create(&tid_produce[i], NULL, produce, &count[i]);
}
pthread_create(&tid_consume, NULL, consume, NULL);
/* 4wait for all producers and the consumer */
for (i = 0; i < nthreads; i++) {
pthread_join(tid_produce[i], NULL);
printf("count[%d] = %d\n", i, count[i]);
}
pthread_join(tid_consume, NULL);
exit(0);
}
/* end main */
void *
produce(void *arg)
{
for ( ; ; ) {
pthread_mutex_lock(&shared.mutex);
if (shared.nput >= nitems) {
pthread_mutex_unlock(&shared.mutex);
return(NULL); /* array is full, we're done */
}
shared.buff[shared.nput] = shared.nval;
shared.nput++;
shared.nval++;
pthread_mutex_unlock(&shared.mutex);
*((int *) arg) += 1;
}
}
/* include consume */
void
consume_wait(int i)
{
for ( ; ; ) {
pthread_mutex_lock(&shared.mutex);
if (i < shared.nput) {
pthread_mutex_unlock(&shared.mutex);
return; /* an item is ready */
}
pthread_mutex_unlock(&shared.mutex);
}
}
void *
consume(void *arg)
{
int i;
for (i = 0; i < nitems; i++) {
consume_wait(i);
if (shared.buff[i] != i)
printf("buff[%d] = %d\n", i, shared.buff[i]);
}
return(NULL);
}