多線程編程之:實(shí)驗(yàn)內(nèi)容——“生產(chǎn)者消費(fèi)者”實(shí)驗(yàn)
掃描二維碼
隨時(shí)隨地手機(jī)看文章
“生產(chǎn)者—消費(fèi)者”問(wèn)題描述如下。
有一個(gè)有限緩沖區(qū)和兩個(gè)線程:生產(chǎn)者和消費(fèi)者。他們分別不停地把產(chǎn)品放入緩沖區(qū)和從緩沖區(qū)中拿走產(chǎn)品。一個(gè)生產(chǎn)者在緩沖區(qū)滿的時(shí)候必須等待,一個(gè)消費(fèi)者在緩沖區(qū)空的時(shí)候也必須等待。另外,因?yàn)榫彌_區(qū)是臨界資源,所以生產(chǎn)者和消費(fèi)者之間必須互斥執(zhí)行。它們之間的關(guān)系如圖9.4所示。
圖9.4生產(chǎn)者消費(fèi)者問(wèn)題描述
這里要求使用有名管道來(lái)模擬有限緩沖區(qū),并且使用信號(hào)量來(lái)解決“生產(chǎn)者—消費(fèi)者”問(wèn)題中的同步和互斥問(wèn)題。
3.實(shí)驗(yàn)步驟(1)信號(hào)量的考慮。
這里使用3個(gè)信號(hào)量,其中兩個(gè)信號(hào)量avail和full分別用于解決生產(chǎn)者和消費(fèi)者線程之間的同步問(wèn)題,mutex是用于這兩個(gè)線程之間的互斥問(wèn)題。其中avail表示有界緩沖區(qū)中的空單元數(shù),初始值為N;full表示有界緩沖區(qū)中非空單元數(shù),初始值為0;mutex是互斥信號(hào)量,初始值為1。
(2)畫出流程圖。
本實(shí)驗(yàn)流程圖如圖9.5所示。
圖9.5“生產(chǎn)者—消費(fèi)者”實(shí)驗(yàn)流程圖
(3)編寫代碼
本實(shí)驗(yàn)的代碼中采用的有界緩沖區(qū)擁有3個(gè)單元,每個(gè)單元為5個(gè)字節(jié)。為了盡量體現(xiàn)每個(gè)信號(hào)量的意義,在程序中生產(chǎn)過(guò)程和消費(fèi)過(guò)程是隨機(jī)(采取0~5s的隨機(jī)時(shí)間間隔)進(jìn)行的,而且生產(chǎn)者的速度比消費(fèi)者的速度平均快兩倍左右(這種關(guān)系可以相反)。生產(chǎn)者一次生產(chǎn)一個(gè)單元的產(chǎn)品(放入“hello”字符串),消費(fèi)者一次消費(fèi)一個(gè)單元的產(chǎn)品。
/*producer-customer.c*/
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<fcntl.h>
#include<pthread.h>
#include<errno.h>
#include<semaphore.h>
#include<sys/ipc.h>
#defineMYFIFO"myfifo"/*緩沖區(qū)有名管道的名字*/
#defineBUFFER_SIZE3/*緩沖區(qū)的單元數(shù)*/
#defineUNIT_SIZE5/*每個(gè)單元的大小*/
#defineRUN_TIME30/*運(yùn)行時(shí)間*/
#defineDELAY_TIME_LEVELS5.0/*周期的最大值*/
intfd;
time_tend_time;
sem_tmutex,full,avail;/*3個(gè)信號(hào)量*/
/*生產(chǎn)者線程*/
void*producer(void*arg)
{
intreal_write;
intdelay_time=0;
while(time(NULL)<end_time)
{
delay_time=(int)(rand()*DELAY_TIME_LEVELS/(RAND_MAX)/2.0)+1;
sleep(delay_time);
/*P操作信號(hào)量avail和mutex*/
sem_wait(&avail);
sem_wait(&mutex);
printf("nProducer:delay=%dn",delay_time);
/*生產(chǎn)者寫入數(shù)據(jù)*/
if((real_write=write(fd,"hello",UNIT_SIZE))==-1)
{
if(errno==EAGAIN)
{
printf("TheFIFOhasnotbeenreadyet.Pleasetrylatern");
}
}
else
{
printf("Write%dtotheFIFOn",real_write);
}
/*V操作信號(hào)量full和mutex*/
sem_post(&full);
sem_post(&mutex);
}
pthread_exit(NULL);
}
/*消費(fèi)者線程*/
void*customer(void*arg)
{
unsignedcharread_buffer[UNIT_SIZE];
intreal_read;
intdelay_time;
while(time(NULL)<end_time)
{
delay_time=(int)(rand()*DELAY_TIME_LEVELS/(RAND_MAX))+1;
sleep(delay_time);
/*P操作信號(hào)量full和mutex*/
sem_wait(&full);
sem_wait(&mutex);
memset(read_buffer,0,UNIT_SIZE);
printf("nCustomer:delay=%dn",delay_time);
if((real_read=read(fd,read_buffer,UNIT_SIZE))==-1)
{
if(errno==EAGAIN)
{
printf("Nodatayetn");
}
}
printf("Read%sfromFIFOn",read_buffer);
/*V操作信號(hào)量avail和mutex*/
sem_post(&avail);
sem_post(&mutex);
}
pthread_exit(NULL);
}
intmain()
{
pthread_tthrd_prd_id,thrd_cst_id;
pthread_tmon_th_id;
intret;
srand(time(NULL));
end_time=time(NULL)+RUN_TIME;
/*創(chuàng)建有名管道*/
if((mkfifo(MYFIFO,O_CREAT|O_EXCL)<0)&&(errno!=EEXIST))
{
printf("Cannotcreatefifon");
returnerrno;
}
/*打開管道*/
fd=open(MYFIFO,O_RDWR);
if(fd==-1)
{
printf("Openfifoerrorn");
returnfd;
}
/*初始化互斥信號(hào)量為1*/
ret=sem_init(&mutex,0,1);
/*初始化avail信號(hào)量為N*/
ret+=sem_init(&avail,0,BUFFER_SIZE);
/*初始化full信號(hào)量為0*/
ret+=sem_init(&full,0,0);
if(ret!=0)
{
printf("Anysemaphoreinitializationfailedn");
returnret;
}
/*創(chuàng)建兩個(gè)線程*/
ret=pthread_create(&thrd_prd_id,NULL,producer,NULL);
if(ret!=0)
{
printf("Createproducerthreaderrorn");
returnret;
}
ret=pthread_create(&thrd_cst_id,NULL,customer,NULL);
if(ret!=0)
{
printf("Createcustomerthreaderrorn");
returnret;
}
pthread_join(thrd_prd_id,NULL);
pthread_join(thrd_cst_id,NULL);
close(fd);
unlink(MYFIFO);
return0;
}
4.實(shí)驗(yàn)結(jié)果運(yùn)行該程序,得到如下結(jié)果:
$./producer_customer
……
Producer:delay=3
Write5totheFIFO
Customer:delay=3
ReadhellofromFIFO
Producer:delay=1
Write5totheFIFO
Producer:delay=2
Write5totheFIFO
Customer:delay=4
ReadhellofromFIFO
Customer:delay=1
ReadhellofromFIFO
Producer:delay=2
Write5totheFIFO
……