C++11并發(fā)學(xué)習(xí)筆記:為什么要使用線程池
為什么要使用線程池?
? ? ? ?目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫(kù)服務(wù)器等都具有一個(gè)共同點(diǎn),就是單位時(shí)間內(nèi)必須處理數(shù)目巨大的連接請(qǐng)求,但處理時(shí)間卻相對(duì)較短。
? ? ? ?傳統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請(qǐng)求之后,即創(chuàng)建一個(gè)新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時(shí)創(chuàng)建,即時(shí)銷(xiāo)毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時(shí)間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時(shí)間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于不停的創(chuàng)建線程,銷(xiāo)毀線程的狀態(tài)。
我們將傳統(tǒng)方案中的線程執(zhí)行過(guò)程分為三個(gè)過(guò)程:T1、T2、T3。
T1:線程創(chuàng)建時(shí)間
T2:線程執(zhí)行時(shí)間,包括線程的同步等時(shí)間
T3:線程銷(xiāo)毀時(shí)間
? ? ? ?那么我們可以看出,線程本身的開(kāi)銷(xiāo)所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時(shí)間很短的話,這比開(kāi)銷(xiāo)可能占到20%-50%左右。如果任務(wù)執(zhí)行時(shí)間很長(zhǎng)的話,這筆開(kāi)銷(xiāo)將是不可忽略的。
? ? ? ?除此之外,線程池能夠減少創(chuàng)建的線程個(gè)數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時(shí)需要并發(fā)的線程數(shù)超過(guò)上界,那么一部分線程將會(huì)等待。而傳統(tǒng)方案中,如果同時(shí)請(qǐng)求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個(gè)線程。盡管這不是一個(gè)很大的數(shù)目,但是也有部分機(jī)器可能達(dá)不到這種要求。
? ? ? ?因此線程池的出現(xiàn)正是著眼于減少線程本身帶來(lái)的開(kāi)銷(xiāo)。線程池采用預(yù)創(chuàng)建的技術(shù),在應(yīng)用程序啟動(dòng)之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊(duì)列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當(dāng)任務(wù)到來(lái)后,緩沖池選擇一個(gè)空閑線程,把任務(wù)傳入此線程中運(yùn)行。當(dāng)N1個(gè)線程都在處理任務(wù)后,緩沖池自動(dòng)創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當(dāng)系統(tǒng)比較空閑時(shí),大部分線程都一直處于暫停狀態(tài),線程池自動(dòng)銷(xiāo)毀一部分線程,回收系統(tǒng)資源。
? ? ? 基于這種預(yù)創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷(xiāo)毀本身所帶來(lái)的開(kāi)銷(xiāo)分?jǐn)偟搅烁鱾€(gè)具體的任務(wù)上,執(zhí)行次數(shù)越多,每個(gè)任務(wù)所分擔(dān)到的線程本身開(kāi)銷(xiāo)則越小,不過(guò)我們另外可能需要考慮進(jìn)去線程之間同步所帶來(lái)的開(kāi)銷(xiāo)
線程池適合場(chǎng)景
? ? ? ?事實(shí)上,線程池并不是萬(wàn)能的。它有其特定的使用場(chǎng)合。線程池致力于減少線程本身的開(kāi)銷(xiāo)對(duì)應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開(kāi)銷(xiāo)與線程執(zhí)行任務(wù)相比不可忽略。如果線程本身的開(kāi)銷(xiāo)相對(duì)于線程任務(wù)執(zhí)行開(kāi)銷(xiāo)而言是可以忽略不計(jì)的,那么此時(shí)線程池所帶來(lái)的好處是不明顯的,比如對(duì)于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時(shí)間較長(zhǎng),開(kāi)銷(xiāo)較大,那么此時(shí),我們采用線程池未必是理想的方法,我們可以選擇“即時(shí)創(chuàng)建,即時(shí)銷(xiāo)毀”的策略。
總之線程池通常適合下面的幾個(gè)場(chǎng)合:
(1)單位時(shí)間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時(shí)間短
(2)對(duì)實(shí)時(shí)性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時(shí)要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
首先感謝github上大神的分享:https://github.com/progschj/ThreadPool
代碼非常的簡(jiǎn)潔,只有一個(gè)頭文件ThreadPool.h,這里貼出來(lái)作為備份。
#ifndef?THREAD_POOL_H #define?THREAD_POOL_H #include#include#include#include#include#include#include#include#includeclass?ThreadPool?{ public: ????ThreadPool(size_t); ????templateauto?enqueue(F&&?f,?Args&&...?args)? ????????->?std::future<typename?std::result_of::type>; ????~ThreadPool(); private: ????//?need?to?keep?track?of?threads?so?we?can?join?them ????std::vector<?std::thread?>?workers; ????//?the?task?queue ????std::queue<?std::function>?tasks; ???? ????//?synchronization ????std::mutex?queue_mutex; ????std::condition_variable?condition; ????bool?stop; }; ? //?the?constructor?just?launches?some?amount?of?workers inline?ThreadPool::ThreadPool(size_t?threads) ????:???stop(false) { ????for(size_t?i?=?0;i<threads;++i) ????????workers.emplace_back( ????????????[this] ????????????{ ????????????????for(;;) ????????????????{ ????????????????????std::functiontask; ????????????????????{ ????????????????????????std::unique_locklock(this->queue_mutex); ????????????????????????this->condition.wait(lock, ????????????????????????????[this]{?return?this->stop?||?!this->tasks.empty();?}); ????????????????????????if(this->stop?&&?this->tasks.empty()) ????????????????????????????return; ????????????????????????task?=?std::move(this->tasks.front()); ????????????????????????this->tasks.pop(); ????????????????????} ????????????????????task(); ????????????????} ????????????} ????????); } //?add?new?work?item?to?the?pool templateauto?ThreadPool::enqueue(F&&?f,?Args&&...?args)? ????->?std::future<typename?std::result_of::type> { ????using?return_type?=?typename?std::result_of::type; ????auto?task?=?std::make_shared<?std::packaged_task>( ????????????std::bind(std::forward(f),?std::forward(args)...) ????????); ???????? ????std::futureres?=?task->get_future(); ????{ ????????std::unique_locklock(queue_mutex); ????????//?don't?allow?enqueueing?after?stopping?the?pool ????????if(stop) ????????????throw?std::runtime_error("enqueue?on?stopped?ThreadPool"); ????????tasks.emplace([task](){?(*task)();?}); ????} ????condition.notify_one(); ????return?res; } //?the?destructor?joins?all?threads inline?ThreadPool::~ThreadPool() { ????{ ????????std::unique_locklock(queue_mutex); ????????stop?=?true; ????} ????condition.notify_all(); ????for(std::thread?&worker:?workers) ????????worker.join(); } #endif
基本使用方法
#include#include?"ThreadPool.h" int?main() { ????//?create?thread?pool?with?4?worker?threads ????ThreadPool?pool(4); ????//?enqueue?and?store?future ????auto?result?=?pool.enqueue([](int?answer)?{?return?answer;?},?42); ????//?get?result?from?future,?print?42 ????std::cout?<<?result.get()?<<?std::endl;? }
另一個(gè)例子
#include#include?"ThreadPool.h" void?func() { ????std::this_thread::sleep_for(std::chrono::milliseconds(100)); ????std::cout<<"worker?thread?ID:"<<std::this_thread::get_id()<<std::endl; } int?main() { ????ThreadPool?pool(4); ????while(1) ????{ ???????pool.enqueue(fun); ????} }
可以看出,四個(gè)線程都在運(yùn)行。但是如果把func()中的延時(shí)放在main()的while循環(huán)中,就只有一個(gè)線程在運(yùn)行了。
線程池,最簡(jiǎn)單的就是生產(chǎn)者消費(fèi)者模型了。池里的每條線程,都是消費(fèi)者,他們消費(fèi)并處理一個(gè)個(gè)的任務(wù),而任務(wù)隊(duì)列就相當(dāng)于生產(chǎn)者了。
線程池最簡(jiǎn)單的形式是含有一個(gè)固定數(shù)量的工作線程來(lái)處理任務(wù),典型的數(shù)量是std::thread::hardware_concurrency()