C++11并發(fā)學(xué)習(xí)筆記:為什么要使用線程池
為什么要使用線程池?
? ? ? ?目前的大多數(shù)網(wǎng)絡(luò)服務(wù)器,包括Web服務(wù)器、Email服務(wù)器以及數(shù)據(jù)庫服務(wù)器等都具有一個共同點(diǎn),就是單位時間內(nèi)必須處理數(shù)目巨大的連接請求,但處理時間卻相對較短。
? ? ? ?傳統(tǒng)多線程方案中我們采用的服務(wù)器模型則是一旦接受到請求之后,即創(chuàng)建一個新的線程,由該線程執(zhí)行任務(wù)。任務(wù)執(zhí)行完畢后,線程退出,這就是是“即時創(chuàng)建,即時銷毀”的策略。盡管與創(chuàng)建進(jìn)程相比,創(chuàng)建線程的時間已經(jīng)大大的縮短,但是如果提交給線程的任務(wù)是執(zhí)行時間較短,而且執(zhí)行次數(shù)極其頻繁,那么服務(wù)器將處于不停的創(chuàng)建線程,銷毀線程的狀態(tài)。
我們將傳統(tǒng)方案中的線程執(zhí)行過程分為三個過程:T1、T2、T3。
T1:線程創(chuàng)建時間
T2:線程執(zhí)行時間,包括線程的同步等時間
T3:線程銷毀時間
? ? ? ?那么我們可以看出,線程本身的開銷所占的比例為(T1+T3) / (T1+T2+T3)。如果線程執(zhí)行的時間很短的話,這比開銷可能占到20%-50%左右。如果任務(wù)執(zhí)行時間很長的話,這筆開銷將是不可忽略的。
? ? ? ?除此之外,線程池能夠減少創(chuàng)建的線程個數(shù)。通常線程池所允許的并發(fā)線程是有上界的,如果同時需要并發(fā)的線程數(shù)超過上界,那么一部分線程將會等待。而傳統(tǒng)方案中,如果同時請求數(shù)目為2000,那么最壞情況下,系統(tǒng)可能需要產(chǎn)生2000個線程。盡管這不是一個很大的數(shù)目,但是也有部分機(jī)器可能達(dá)不到這種要求。
? ? ? ?因此線程池的出現(xiàn)正是著眼于減少線程本身帶來的開銷。線程池采用預(yù)創(chuàng)建的技術(shù),在應(yīng)用程序啟動之后,將立即創(chuàng)建一定數(shù)量的線程(N1),放入空閑隊(duì)列中。這些線程都是處于阻塞(Suspended)狀態(tài),不消耗CPU,但占用較小的內(nèi)存空間。當(dāng)任務(wù)到來后,緩沖池選擇一個空閑線程,把任務(wù)傳入此線程中運(yùn)行。當(dāng)N1個線程都在處理任務(wù)后,緩沖池自動創(chuàng)建一定數(shù)量的新線程,用于處理更多的任務(wù)。在任務(wù)執(zhí)行完畢后線程也不退出,而是繼續(xù)保持在池中等待下一次的任務(wù)。當(dāng)系統(tǒng)比較空閑時,大部分線程都一直處于暫停狀態(tài),線程池自動銷毀一部分線程,回收系統(tǒng)資源。
? ? ? 基于這種預(yù)創(chuàng)建技術(shù),線程池將線程創(chuàng)建和銷毀本身所帶來的開銷分?jǐn)偟搅烁鱾€具體的任務(wù)上,執(zhí)行次數(shù)越多,每個任務(wù)所分擔(dān)到的線程本身開銷則越小,不過我們另外可能需要考慮進(jìn)去線程之間同步所帶來的開銷
線程池適合場景
? ? ? ?事實(shí)上,線程池并不是萬能的。它有其特定的使用場合。線程池致力于減少線程本身的開銷對應(yīng)用所產(chǎn)生的影響,這是有前提的,前提就是線程本身開銷與線程執(zhí)行任務(wù)相比不可忽略。如果線程本身的開銷相對于線程任務(wù)執(zhí)行開銷而言是可以忽略不計(jì)的,那么此時線程池所帶來的好處是不明顯的,比如對于FTP服務(wù)器以及Telnet服務(wù)器,通常傳送文件的時間較長,開銷較大,那么此時,我們采用線程池未必是理想的方法,我們可以選擇“即時創(chuàng)建,即時銷毀”的策略。
總之線程池通常適合下面的幾個場合:
(1)單位時間內(nèi)處理任務(wù)頻繁而且任務(wù)處理時間短
(2)對實(shí)時性要求較高。如果接受到任務(wù)后在創(chuàng)建線程,可能滿足不了實(shí)時要求,因此必須采用線程池進(jìn)行預(yù)創(chuàng)建。
首先感謝github上大神的分享:https://github.com/progschj/ThreadPool
代碼非常的簡潔,只有一個頭文件ThreadPool.h,這里貼出來作為備份。
#ifndef?THREAD_POOL_H #define?THREAD_POOL_H #include#include#include#include#include#include#include#include#includeclass?ThreadPool?{ public: ????ThreadPool(size_t); ????templateauto?enqueue(F&&?f,?Args&&...?args)? ????????->?std::future<typename?std::result_of::type>; ????~ThreadPool(); private: ????//?need?to?keep?track?of?threads?so?we?can?join?them ????std::vector<?std::thread?>?workers; ????//?the?task?queue ????std::queue<?std::function>?tasks; ???? ????//?synchronization ????std::mutex?queue_mutex; ????std::condition_variable?condition; ????bool?stop; }; ? //?the?constructor?just?launches?some?amount?of?workers inline?ThreadPool::ThreadPool(size_t?threads) ????:???stop(false) { ????for(size_t?i?=?0;i<threads;++i) ????????workers.emplace_back( ????????????[this] ????????????{ ????????????????for(;;) ????????????????{ ????????????????????std::functiontask; ????????????????????{ ????????????????????????std::unique_locklock(this->queue_mutex); ????????????????????????this->condition.wait(lock, ????????????????????????????[this]{?return?this->stop?||?!this->tasks.empty();?}); ????????????????????????if(this->stop?&&?this->tasks.empty()) ????????????????????????????return; ????????????????????????task?=?std::move(this->tasks.front()); ????????????????????????this->tasks.pop(); ????????????????????} ????????????????????task(); ????????????????} ????????????} ????????); } //?add?new?work?item?to?the?pool templateauto?ThreadPool::enqueue(F&&?f,?Args&&...?args)? ????->?std::future<typename?std::result_of::type> { ????using?return_type?=?typename?std::result_of::type; ????auto?task?=?std::make_shared<?std::packaged_task>( ????????????std::bind(std::forward(f),?std::forward(args)...) ????????); ???????? ????std::futureres?=?task->get_future(); ????{ ????????std::unique_locklock(queue_mutex); ????????//?don't?allow?enqueueing?after?stopping?the?pool ????????if(stop) ????????????throw?std::runtime_error("enqueue?on?stopped?ThreadPool"); ????????tasks.emplace([task](){?(*task)();?}); ????} ????condition.notify_one(); ????return?res; } //?the?destructor?joins?all?threads inline?ThreadPool::~ThreadPool() { ????{ ????????std::unique_locklock(queue_mutex); ????????stop?=?true; ????} ????condition.notify_all(); ????for(std::thread?&worker:?workers) ????????worker.join(); } #endif
基本使用方法
#include#include?"ThreadPool.h" int?main() { ????//?create?thread?pool?with?4?worker?threads ????ThreadPool?pool(4); ????//?enqueue?and?store?future ????auto?result?=?pool.enqueue([](int?answer)?{?return?answer;?},?42); ????//?get?result?from?future,?print?42 ????std::cout?<<?result.get()?<<?std::endl;? }
另一個例子
#include#include?"ThreadPool.h" void?func() { ????std::this_thread::sleep_for(std::chrono::milliseconds(100)); ????std::cout<<"worker?thread?ID:"<<std::this_thread::get_id()<<std::endl; } int?main() { ????ThreadPool?pool(4); ????while(1) ????{ ???????pool.enqueue(fun); ????} }
可以看出,四個線程都在運(yùn)行。但是如果把func()中的延時放在main()的while循環(huán)中,就只有一個線程在運(yùn)行了。
線程池,最簡單的就是生產(chǎn)者消費(fèi)者模型了。池里的每條線程,都是消費(fèi)者,他們消費(fèi)并處理一個個的任務(wù),而任務(wù)隊(duì)列就相當(dāng)于生產(chǎn)者了。
線程池最簡單的形式是含有一個固定數(shù)量的工作線程來處理任務(wù),典型的數(shù)量是std::thread::hardware_concurrency()