當(dāng)前位置:首頁(yè) > 公眾號(hào)精選 > 小林coding
[導(dǎo)讀]線程池可以說(shuō)是 Java 進(jìn)階必備的知識(shí)點(diǎn)了,也是面試中必備的考點(diǎn),可能不少人看了這篇文章后能對(duì)線程池工作原理說(shuō)上一二,但這還遠(yuǎn)遠(yuǎn)不夠,如果碰到比較有經(jīng)驗(yàn)的面試官再繼續(xù)追問(wèn),很可能會(huì)被吊打。

前言

線程池可以說(shuō)是 Java 進(jìn)階必備的知識(shí)點(diǎn)了,也是面試中必備的考點(diǎn),可能不少人看了這篇文章后能對(duì)線程池工作原理說(shuō)上一二,但這還遠(yuǎn)遠(yuǎn)不夠,如果碰到比較有經(jīng)驗(yàn)的面試官再繼續(xù)追問(wèn),很可能會(huì)被吊打,考慮如下問(wèn)題:

  1. Tomcat 的線程池和 JDK 的線程池實(shí)現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實(shí)現(xiàn)嗎?
  2. 我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過(guò)這樣的一個(gè)問(wèn)題:壓測(cè)時(shí)接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊(duì)列為 5000,你能從這個(gè)設(shè)置中發(fā)現(xiàn)出一些問(wèn)題并對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)嗎?
  3. 線程池里的線程真的有核心線程和非核心線程之分?
  4. 線程池被 shutdown 后,還能產(chǎn)生新的線程?
  5. 線程把任務(wù)丟給線程池后肯定就馬上返回了?
  6. 線程池里的線程異常后會(huì)再次新增線程嗎,如何捕獲這些線程拋出的異常?
  7. 線程池的大小如何設(shè)置,如何 動(dòng)態(tài)設(shè)置線程池的參數(shù)
  8. 線程池的狀態(tài)機(jī)畫一下?
  9. 阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?
  10. 使用線程池應(yīng)該避免哪些問(wèn)題,能否簡(jiǎn)單說(shuō)下線程池的最佳實(shí)踐?
  11. 如何優(yōu)雅關(guān)閉線程池
  12. 如何對(duì)線程池進(jìn)行監(jiān)控

相信不少人看了這些問(wèn)題會(huì)有些懵逼

其實(shí)這些問(wèn)題的答案大多數(shù)都藏在線程池的源碼里,所以深入了解線程池的源碼非常重要,本章我們將會(huì)來(lái)學(xué)習(xí)一下線程池的源碼,相信看完之后,以上的問(wèn)題大部分都能回答,另外一些問(wèn)題我們也會(huì)在文中與大家一起探討。

本文將會(huì)從以下幾個(gè)方面來(lái)介紹線程池的原理。

  1. 為什么要用線程池
  2. 線程池是如何工作的
  3. 線程池提交任務(wù)的兩種方式
  4. ThreadPoolExecutor 源碼剖析
  5. 解答開篇的問(wèn)題
  6. 線程池的最佳實(shí)踐
  7. 總結(jié)

相信大家看完對(duì)線程池的理解會(huì)更進(jìn)一步,肝文不易,看完別完了三連哦。

為什么要用線程池

在上文也提到過(guò),創(chuàng)建線程有三大開銷,如下:

1、其實(shí) Java 中的線程模型是基于操作系統(tǒng)原生線程模型實(shí)現(xiàn)的,也就是說(shuō) Java 中的線程其實(shí)是基于內(nèi)核線程實(shí)現(xiàn)的,線程的創(chuàng)建,析構(gòu)與同步都需要進(jìn)行系統(tǒng)調(diào)用,而系統(tǒng)調(diào)用需要在用戶態(tài)與內(nèi)核中來(lái)回切換,代價(jià)相對(duì)較高,線程的生命周期包括「線程創(chuàng)建時(shí)間」,「線程執(zhí)行任務(wù)時(shí)間」,「線程銷毀時(shí)間」,創(chuàng)建和銷毀都需要導(dǎo)致系統(tǒng)調(diào)用。2、每個(gè) Thread 都需要有一個(gè)內(nèi)核線程的支持,也就意味著每個(gè) Thread 都需要消耗一定的內(nèi)核資源(如內(nèi)核線程的棧空間),因此能創(chuàng)建的 Thread 是有限的,默認(rèn)一個(gè)線程的線程棧大小是 1 M,有圖有真相

圖中所示,在 Java 8 下,創(chuàng)建 19 個(gè)線程(thread #19)需要?jiǎng)?chuàng)建 19535 KB,即 1 M 左右,reserved 代表如果創(chuàng)建 19 個(gè)線程,操作系統(tǒng)保證會(huì)為其分配這么多空間(實(shí)際上并不一定分配),committed 則表示實(shí)際已分配的空間大小。

畫外音:注意,這是在 Java 8 下的線程占用空間情況,但在 Java 11 中,對(duì)線程作了很大的優(yōu)化,創(chuàng)建一個(gè)線程大概只需要 40 KB,空間消耗大大減少

3、線程多了,導(dǎo)致不可忽視的上下文切換開銷。

由此可見,線程的創(chuàng)建是昂貴的,所以必須以線程池的形式來(lái)管理這些線程,在線程池中合理設(shè)置線程大小和管理線程,以達(dá)到以合理的創(chuàng)建線程大小以達(dá)到最大化收益,最小化風(fēng)險(xiǎn)的目的,對(duì)于開發(fā)人員來(lái)說(shuō),要完成任務(wù)不用關(guān)心線程如何創(chuàng)建,如何銷毀,如何協(xié)作,只需要關(guān)心提交的任務(wù)何時(shí)完成即可,對(duì)線程的調(diào)優(yōu),監(jiān)控等這些細(xì)枝末節(jié)的工作通通交給線程池來(lái)實(shí)現(xiàn),所以也讓開發(fā)人員得到極大的解脫!

類似線程池的這種池化思想應(yīng)用在很多地方,比如數(shù)據(jù)庫(kù)連接池,Http 連接池等,避免了昂貴資源的創(chuàng)建,提升了性能,也解放了開發(fā)人員。

ThreadPoolExecutor 設(shè)計(jì)架構(gòu)圖

首先我們來(lái)看看 Executor 框架的設(shè)計(jì)圖

  • Executor: 最頂層的 Executor 接口只提供了一個(gè) execute 接口,實(shí)現(xiàn)了提交任務(wù)與執(zhí)行任務(wù)的解藕,這個(gè)方法是最核心的,也是我們?cè)创a剖析的重點(diǎn),此方法最終是由 ThreadPoolExecutor 實(shí)現(xiàn)的,
  • ExecutorService 擴(kuò)展了 Executor 接口,實(shí)現(xiàn)了終止執(zhí)行器,單個(gè)/批量提交任務(wù)等方法
  • AbstractExecutorService 實(shí)現(xiàn)了 ExecutorService 接口,實(shí)現(xiàn)了除 execute 以外的所有方法,只將一個(gè)最重要的 execute 方法交給 ThreadPoolExecutor 實(shí)現(xiàn)。

這樣的分層設(shè)計(jì)雖然層次看起來(lái)挺多,但每一層每司其職,邏輯清晰,值得借鑒。

線程池是如何工作的

首先我們來(lái)看下如何創(chuàng)建一個(gè)線程池

ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(10,?20,?600L,
????????????????????TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(4096),
????????????????????new?NamedThreadFactory("common-work-thread"));
//?設(shè)置拒絕策略,默認(rèn)為?AbortPolicy
threadPool.setRejectedExecutionHandler(new?ThreadPoolExecutor.AbortPolicy());

看下其構(gòu)造方法簽名如下

public?ThreadPoolExecutor(int?corePoolSize,
??????????????????????????????int?maximumPoolSize,
??????????????????????????????long?keepAliveTime,
??????????????????????????????TimeUnit?unit,
??????????????????????????????BlockingQueue?workQueue,
??????????????????????????????ThreadFactory?threadFactory,
??????????????????????????????RejectedExecutionHandler?handler)
?
{
????????????//?省略代碼若干
}

要理解這些參數(shù)具體代表的意義,必須清楚線程池提交任務(wù)與執(zhí)行任務(wù)流程,如下

圖片來(lái)自美團(tuán)技術(shù)團(tuán)隊(duì)

步驟如下

1、corePoolSize:如果提交任務(wù)后線程還在運(yùn)行,當(dāng)線程數(shù)小于 corePoolSize 值時(shí),無(wú)論線程池中的線程是否忙碌,都會(huì)創(chuàng)建線程,并把任務(wù)交給此新創(chuàng)建的線程進(jìn)行處理,如果線程數(shù)少于等于 corePoolSize,那么這些線程不會(huì)回收,除非將 allowCoreThreadTimeOut 設(shè)置為 true,但一般不這么干,因?yàn)轭l繁地創(chuàng)建銷毀線程會(huì)極大地增加系統(tǒng)調(diào)用的開銷。

2、workQueue:如果線程數(shù)大于核心數(shù)(corePoolSize)且小于最大線程數(shù)(maximumPoolSize),則會(huì)將任務(wù)先丟到阻塞隊(duì)列里,然后線程自己去阻塞隊(duì)列中拉取任務(wù)執(zhí)行。

3、maximumPoolSize: 線程池中最大可創(chuàng)建的線程數(shù),如果提交任務(wù)時(shí)隊(duì)列滿了且線程數(shù)未到達(dá)這個(gè)設(shè)定值,則會(huì)創(chuàng)建線程并執(zhí)行此次提交的任務(wù),如果提交任務(wù)時(shí)隊(duì)列滿了但線池?cái)?shù)已經(jīng)到達(dá)了這個(gè)值,此時(shí)說(shuō)明已經(jīng)超出了線池程的負(fù)載能力,就會(huì)執(zhí)行拒絕策略,這也好理解,總不能讓源源不斷地任務(wù)進(jìn)來(lái)把線程池給壓垮了吧,我們首先要保證線程池能正常工作。

4、RejectedExecutionHandler:一共有以下四種拒絕策略

  • AbortPolicy:丟棄任務(wù)并拋出異常,這也是默認(rèn)策略;
  • CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù),所以開頭的問(wèn)題「線程把任務(wù)丟給線程池后肯定就馬上返回了?」我們可以回答了,如果用的是 CallerRunsPolicy 策略,提交任務(wù)的線程(比如主線程)提交任務(wù)后并不能保證馬上就返回,當(dāng)觸發(fā)了這個(gè) reject 策略不得不親自來(lái)處理這個(gè)任務(wù)。
  • DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。
  • DiscardPolicy:直接丟棄任務(wù),不拋出任何異常,這種策略只適用于不重要的任務(wù)。

5、keepAliveTime: 線程存活時(shí)間,如果在此時(shí)間內(nèi)超出 corePoolSize 大小的線程處于 idle 狀態(tài),這些線程會(huì)被回收

6、threadFactory:可以用此參數(shù)設(shè)置線程池的命名,指定 defaultUncaughtExceptionHandler(有啥用,后文闡述),甚至可以設(shè)定線程為守護(hù)線程。

現(xiàn)在問(wèn)題來(lái)了,該如何合理設(shè)置這些參數(shù)呢。

首先來(lái)看線程大小設(shè)置

<>告訴我們應(yīng)該分兩種情況

  1. 針對(duì) CPU 密集型的任務(wù),在有 N cpu個(gè)處理器的系統(tǒng)上,當(dāng)線程池的大小為 N cpu + 1 時(shí),通常能實(shí)現(xiàn)最優(yōu)的利用率,+1 是因?yàn)楫?dāng)計(jì)算密集型線程偶爾由于缺頁(yè)故障或其他原因而暫停工作時(shí),這個(gè)"額外"的線程也能確保 CPU 的時(shí)鐘周期不會(huì)被浪費(fèi),所謂 CPU 密集,就是線程一直在忙碌,這樣將線程池的大小設(shè)置為 N cpu + 1 避免了線程的上下文切換,讓線程時(shí)刻處于忙碌狀態(tài),將 CPU 的利用率最大化。
  2. 針對(duì) IO 密集型的任務(wù),它也給出了如下計(jì)算公式

這些公式看看就好,實(shí)際的業(yè)務(wù)場(chǎng)景中基本用不上,這些公式太過(guò)理論化了,脫離業(yè)務(wù)場(chǎng)景,僅可作個(gè)理論參考,舉個(gè)例子,你說(shuō) CPU 密集型任務(wù)設(shè)置線程池大小為 N + 1個(gè),但實(shí)際上在業(yè)務(wù)中往往不只設(shè)置一個(gè)線程池,這種情況套用的公式就懵逼了

再來(lái)看 workQueue 的大小設(shè)置

由上文可知,如果最大線程大于核心線程數(shù),當(dāng)且僅當(dāng)核心線程滿了且 workQueue 也滿的情況下,才會(huì)新增新的線程,也就是說(shuō)如果 workQueue 是無(wú)界隊(duì)列,那么當(dāng)線程數(shù)增加到 corePoolSize 后,永遠(yuǎn)不會(huì)再新增新的線程了,也就是說(shuō)此時(shí) maximumPoolSize 的設(shè)置就無(wú)效了,也無(wú)法觸發(fā) RejectedExecutionHandler 拒絕策略,任務(wù)只會(huì)源源不斷地填充到 workQueue,直到 OOM。

所以 workQueue 應(yīng)該為有界隊(duì)列,至少保證在任務(wù)過(guò)載的情況下線程池還能正常工作,那么哪些是有有界隊(duì)列,哪些是無(wú)界隊(duì)列呢。

有界隊(duì)列我們常用的以下兩個(gè)

  • LinkedBlockingQueue: 鏈表構(gòu)成的有界隊(duì)列,按先進(jìn)先出(FIFO)的順序?qū)υ剡M(jìn)行排列,但注意在創(chuàng)建時(shí)需指定其大小,否則其大小默認(rèn)為 Integer.MAX_VALUE,相當(dāng)于無(wú)界隊(duì)列了
  • ArrayBlockingQueue: 數(shù)組實(shí)現(xiàn)的有界隊(duì)列,按先進(jìn)先出(FIFO)的順序?qū)υ剡M(jìn)行排列。

無(wú)界隊(duì)列我們常用 PriorityBlockingQueue 這個(gè)優(yōu)先級(jí)隊(duì)列,任務(wù)插入的時(shí)候可以指定其權(quán)重以讓這些任務(wù)優(yōu)先執(zhí)行,但這個(gè)隊(duì)列很少用,原因很簡(jiǎn)單,線程池里的任務(wù)執(zhí)行順序一般是平等的,如果真有必須某些類型的任務(wù)需要優(yōu)先執(zhí)行,大不了再開個(gè)線程池好了,將不同的任務(wù)類型用不同的線程池隔離開來(lái),也是合理利用線程池的一種實(shí)踐。

說(shuō)到這我相信大家應(yīng)該能回答開頭的問(wèn)題「阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?」,最常見的是以下兩種創(chuàng)建方式

image-20201109002227476

newCachedThreadPool 方法的最大線程數(shù)設(shè)置成了 Integer.MAX_VALUE,而 newSingleThreadExecutor 方法創(chuàng)建 workQueue 時(shí) LinkedBlockingQueue 未聲明大小,相當(dāng)于創(chuàng)建了無(wú)界隊(duì)列,一不小心就會(huì)導(dǎo)致 OOM。

threadFactory 如何設(shè)置

一般業(yè)務(wù)中會(huì)有多個(gè)線程池,如果某個(gè)線程池出現(xiàn)了問(wèn)題,定位是哪一個(gè)線程出問(wèn)題很重要,所以為每個(gè)線程池取一個(gè)名字就很有必要了,我司用的 dubbo 的 NamedThreadFactory 來(lái)生成 threadFactory,創(chuàng)建很簡(jiǎn)單

new?NamedThreadFactory("demo-work")

它的實(shí)現(xiàn)還是很巧妙的,有興趣地可以看看它的源碼,每調(diào)用一次,底層有個(gè)計(jì)數(shù)器會(huì)加一,會(huì)依次命名為 「demo-work-thread-1」, 「demo-work-thread-2」, 「demo-work-thread-3」這樣遞增的字符串。

在實(shí)際的業(yè)務(wù)場(chǎng)景中,一般很難確定 corePoolSize, workQueue,maximumPoolSize 的大小,如果出問(wèn)題了,一般來(lái)說(shuō)只能重新設(shè)置一下這些參數(shù)再發(fā)布,這樣往往需要耗費(fèi)一些時(shí)間,美團(tuán)的這篇文章給出了讓人眼前一亮的解決方案,當(dāng)發(fā)現(xiàn)問(wèn)題(線程池監(jiān)控告警)時(shí),動(dòng)態(tài)調(diào)整這些參數(shù),可以讓這些參數(shù)實(shí)時(shí)生效,能在發(fā)現(xiàn)問(wèn)題時(shí)及時(shí)解決,確實(shí)是個(gè)很好的思路。

線程池提交任務(wù)的兩種方式

線程池創(chuàng)建好了,該怎么給它提交任務(wù),有兩種方式,調(diào)用 execute 和 submit 方法,來(lái)看下這兩個(gè)方法的方法簽名

//?方式一:execute 方法
public?void?execute(Runnable?command)?{
}

//?方式二:ExecutorService 中 submit 的三個(gè)方法
?Future?submit(Callable?task);
?Future?submit(Runnable?task,?T?result);
Future?submit(Runnable?task);

區(qū)別在于調(diào)用 execute 無(wú)返回值,而調(diào)用 ?submit 可以返回 Future,那么這個(gè) Future 能到底能干啥呢,看它的接口

public?interface?Future<V>?{

????/**
?????*?取消正在執(zhí)行的任務(wù),如果任務(wù)已執(zhí)行或已被取消,或者由于某些原因不能取消則返回?false
?????*?如果任務(wù)未開始或者任務(wù)已開始但可以中斷(mayInterruptIfRunning?為?true),則
?????*?可以取消/中斷此任務(wù)
?????*/

????boolean?cancel(boolean?mayInterruptIfRunning);

????/**
?????*?任務(wù)在完成前是否已被取消
?????*/

????boolean?isCancelled();

????/**
?????*?正常的執(zhí)行完流程流程,或拋出異常,或取消導(dǎo)致的任務(wù)完成都會(huì)返回?true
?????*/

????boolean?isDone();

????/**
?????*?阻塞等待任務(wù)的執(zhí)行結(jié)果
?????*/

????V?get()?throws?InterruptedException,?ExecutionException;

????/**
?????*?阻塞等待任務(wù)的執(zhí)行結(jié)果,不過(guò)這里指定了時(shí)間,如果在?timeout?時(shí)間內(nèi)任務(wù)還未執(zhí)行完成,
?????*?則拋出?TimeoutException?異常
?????*/

????V?get(long?timeout,?TimeUnit?unit)
????????throws?InterruptedException,?ExecutionException,?TimeoutException
;
}

可以用 Future 取消任務(wù),判斷任務(wù)是否已取消/完成,甚至可以阻塞等待結(jié)果。

submit 為啥能提交任務(wù)(Runnable)的同時(shí)也能返回任務(wù)(Future)的執(zhí)行結(jié)果呢

原來(lái)在最后執(zhí)行 execute 前用 newTaskFor 將 task 封裝成了 RunnableFuture,newTaskFor 返回了 FutureTask 這個(gè)類,結(jié)構(gòu)圖如下

可以看到 FutureTask 這個(gè)接口既實(shí)現(xiàn)了 Runnable 接口,也實(shí)現(xiàn) Future 接口,所以在提交任務(wù)的同時(shí)也能利用 Future 接口來(lái)執(zhí)行任務(wù)的取消,獲取任務(wù)的狀態(tài),等待執(zhí)行結(jié)果這些操作。

execute 與 submit 除了是否能返回執(zhí)行結(jié)果這一區(qū)別外,還有一個(gè)重要區(qū)別,那就是使用 execute 執(zhí)行如果發(fā)生了異常,是捕獲不到的,默認(rèn)會(huì)執(zhí)行 ThreadGroup 的 uncaughtException 方法(下圖數(shù)字 2 對(duì)應(yīng)的邏輯)

所以如果你想監(jiān)控執(zhí)行 execute 方法時(shí)發(fā)生的異常,需要通過(guò) threadFactory 來(lái)指定一個(gè) UncaughtExceptionHandler,這樣就會(huì)執(zhí)行上圖中的 1,進(jìn)而執(zhí)行 UncaughtExceptionHandler 中的邏輯,如下所示:

//1.實(shí)現(xiàn)一個(gè)自己的線程池工廠
ThreadFactory?factory?=?(Runnable?r)?->?{
????//創(chuàng)建一個(gè)線程
????Thread?t?=?new?Thread(r);
????//給創(chuàng)建的線程設(shè)置UncaughtExceptionHandler對(duì)象?里面實(shí)現(xiàn)異常的默認(rèn)邏輯
????t.setDefaultUncaughtExceptionHandler((Thread?thread1,?Throwable?e)?->?{
????????//?在此設(shè)置統(tǒng)計(jì)監(jiān)控邏輯
????????System.out.println("線程工廠設(shè)置的exceptionHandler"?+?e.getMessage());
????});
????return?t;
};

//?2.創(chuàng)建一個(gè)自己定義的線程池,使用自己定義的線程工廠
ExecutorService?service?=?new?ThreadPoolExecutor(1,?1,?0,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue(10),factory);

//3.提交任務(wù)
service.execute(()->{
????int?i=1/0;
});

執(zhí)行以上邏輯最終會(huì)輸出「線程工廠設(shè)置的exceptionHandler/ by zero」,通過(guò)這樣的方式就能通過(guò)設(shè)定的 defaultUncaughtExceptionHandler 來(lái)執(zhí)行我們的監(jiān)控邏輯了。

如果用 submit ,如何捕獲異常呢,當(dāng)我們調(diào)用 future.get 就可以捕獲

Callable?testCallable?=?xxx;
Future?future?=?executor.submit(myCallable);
try?{
????future1.get(3));
}?catch?(InterruptedException?e)?{
????e.printStackTrace();
}?catch?(ExecutionException?e)?{
????e.printStackTrace();
}

那么 future 為啥在 get 的時(shí)候才捕獲異步呢,因?yàn)樵趫?zhí)行 submit 時(shí)拋出異常后此異常被保存了起來(lái),而在 get 的時(shí)候才被拋出

關(guān)于 execute 和 submit 的執(zhí)行流程 why 神的這篇文章寫得非常透徹,我就不拾人牙慧了,建議大家好好品品,收獲會(huì)很大!

ThreadPoolExecutor 源碼剖析

前面鋪墊了這么多,終于到了最核心的源碼剖析環(huán)節(jié)了。

對(duì)于線程池來(lái)說(shuō),我們最關(guān)心的是它的「狀態(tài)」和「可運(yùn)行的線程數(shù)量」,一般來(lái)說(shuō)我們可以選擇用兩個(gè)變量來(lái)記錄,不過(guò) Doug Lea 只用了一個(gè)變量(ctl)就達(dá)成目的了,我們知道變量越多,代碼的可維護(hù)性就越差,也越容易出 bug, 所以只用一個(gè)變量就達(dá)成了兩個(gè)變量的效果,這讓代碼的可維護(hù)性大大提高,那么他是怎么設(shè)計(jì)的呢

//?ThreadPoolExecutor.java
public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{
????private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
????private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3;
????private?static?final?int?CAPACITY???=?(1?<1;

????//?結(jié)果:111 00000000000000000000000000000
????private?static?final?int?RUNNING????=?-1?<????//?結(jié)果:?000?00000000000000000000000000000
????private?static?final?int?SHUTDOWN???=??0?<????//?結(jié)果:?001 00000000000000000000000000000
????private?static?final?int?STOP???????=??1?<????//?結(jié)果:?010?00000000000000000000000000000
????private?static?final?int?TIDYING????=??2?<????//?結(jié)果:?011 00000000000000000000000000000
????private?static?final?int?TERMINATED?=??3?<
????//?獲取線程池的狀態(tài)
????private?static?int?runStateOf(int?c)?????{?return?c?&?~CAPACITY;?}
????//?獲取線程數(shù)量
????private?static?int?workerCountOf(int?c)??{?return?c?&?CAPACITY;?}
}

可以看到,ctl 是一個(gè) 原子類的 Integer 變量,有 32 位,低 29 位表示線程數(shù)量, 29 位最大可以表示 (2^29)-1 (大概 5 億多),足夠記錄線程大小了,如果未來(lái)還是不夠,可以把 ctl 聲明為 AtomicLong,高 3 位用來(lái)表示線程池的狀態(tài),3 位可以表示 8 個(gè)線程池的狀態(tài),由于線程池總共只有五個(gè)狀態(tài),所以 3 位也是足夠了,線程池的五個(gè)狀態(tài)如下

  • RUNNING: 接收新的任務(wù),并能繼續(xù)處理 workQueue 中的任務(wù)
  • SHUTDOWN: 不再接收新的任務(wù),不過(guò)能繼續(xù)處理 workQueue 中的任務(wù)
  • STOP: 不再接收新的任務(wù),也不再處理 workQueue 中的任務(wù),并且會(huì)中斷正在處理任務(wù)的線程
  • TIDYING: 所有的任務(wù)都完結(jié)了,并且線程數(shù)量(workCount)為 0 時(shí)即為此狀態(tài),進(jìn)入此狀態(tài)后會(huì)調(diào)用 terminated() 這個(gè)鉤子方法進(jìn)入 TERMINATED 狀態(tài)
  • TERMINATED: 調(diào)用 terminated() 方法后即為此狀態(tài)

線程池的狀態(tài)流轉(zhuǎn)及觸發(fā)條件如下

有了這些基礎(chǔ),我們來(lái)分析下 execute 的源碼


public?void?execute(Runnable?command)?{
????if?(command?==?null)
????????throw?new?NullPointerException();
????int?c?=?ctl.get();
????//?如果當(dāng)前線程數(shù)少于核心線程數(shù)(corePoolSize),無(wú)論核心線程是否忙碌,都創(chuàng)建線程,直到達(dá)到?corePoolSize?為止
????if?(workerCountOf(c)?????????//?創(chuàng)建線程并將此任務(wù)交給?worker?處理(此時(shí)此任務(wù)即?worker?中的?firstTask)
????????if?(addWorker(command,?true))
????????????return;
????????c?=?ctl.get();
????}

????//?如果線程池處于?RUNNING?狀態(tài),并且線程數(shù)大于?corePoolSize?或者?
????//?線程數(shù)少于?corePoolSize?但創(chuàng)建線程失敗了,則將任務(wù)丟進(jìn)?workQueue?中
????if?(isRunning(c)?&&?workQueue.offer(command))?{
????????int?recheck?=?ctl.get();
????????//?這里需要再次檢查線程池是否處于?RUNNING?狀態(tài),因?yàn)樵谌蝿?wù)入隊(duì)后可能線程池狀態(tài)會(huì)發(fā)生變化,(比如調(diào)用了?shutdown?方法等),如果線程狀態(tài)發(fā)生變化了,則移除此任務(wù),執(zhí)行拒絕策略
????????if?(!?isRunning(recheck)?&&?remove(command))
????????????reject(command);
????????//?如果線程池在?RUNNING?狀態(tài)下,線程數(shù)為?0,則新建線程加速處理?workQueue?中的任務(wù)
????????else?if?(workerCountOf(recheck)?==?0)
????????????addWorker(null,?false);
????}
????//?這段邏輯說(shuō)明線程數(shù)大于?corePoolSize?且任務(wù)入隊(duì)失敗了,此時(shí)會(huì)以最大線程數(shù)(maximumPoolSize)為界來(lái)創(chuàng)建線程,如果失敗,說(shuō)明線程數(shù)超過(guò)了?maximumPoolSize,則執(zhí)行拒絕策略
????else?if?(!addWorker(command,?false))
????????reject(command);
}

從這段代碼中可以看到,創(chuàng)建線程是調(diào)用 addWorker 實(shí)現(xiàn)的,在分析 addWorker 之前,有必要簡(jiǎn)單提一下 Worker,線程池把每一個(gè)執(zhí)行任務(wù)的線程都封裝為 Worker 的形式,取名為 Worker 很形象,線程池的本質(zhì)是生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者不斷地往 workQueue 中丟 task, workQueue 就像流水線一樣不斷地輸送著任務(wù),而 worker(工人) 不斷地取任務(wù)來(lái)執(zhí)行

那么問(wèn)題來(lái)了,為啥要把線程封裝到 worker 中呢,線程池拿到 task 后直接丟給線程處理或者讓線程自己去 workQueue 中處理不就完了?

將線程封裝為 worker 主要是為了更好地管理線程的中斷

來(lái)看下 Worker 的定義

//?此處可以看出?worker?既是一個(gè)?Runnable?任務(wù),也實(shí)現(xiàn)了?AQS(實(shí)際上是用?AQS?實(shí)現(xiàn)了一個(gè)獨(dú)占鎖,這樣由于?worker?運(yùn)行時(shí)會(huì)上鎖,執(zhí)行?shutdown,setCorePoolSize,setMaximumPoolSize等方法時(shí)會(huì)試著中斷線程(interruptIdleWorkers)?,在這個(gè)方法中斷方法中會(huì)先嘗試獲取?worker?的鎖,如果不成功,說(shuō)明?worker?在運(yùn)行中,此時(shí)會(huì)先讓?worker?執(zhí)行完任務(wù)再關(guān)閉?worker?的線程,實(shí)現(xiàn)優(yōu)雅關(guān)閉線程的目的)
private?final?class?Worker
????extends?AbstractQueuedSynchronizer
????implements?Runnable
????
{
????????private?static?final?long?serialVersionUID?=?6138294804551838833L;

????????//?實(shí)際執(zhí)行任務(wù)的線程
????????final?Thread?thread;
????????//?上文提到,如果當(dāng)前線程數(shù)少于核心線程數(shù),創(chuàng)建線程并將提交的任務(wù)交給?worker?處理處理,此時(shí)?firstTask?即為此提交的任務(wù),如果?worker?從?workQueue?中獲取任務(wù),則?firstTask?為空
????????Runnable?firstTask;
????????//?統(tǒng)計(jì)完成的任務(wù)數(shù)
????????volatile?long?completedTasks;

????????Worker(Runnable?firstTask)?{
????????????//?初始化為?-1,這樣在線程運(yùn)行前(調(diào)用runWorker)禁止中斷,在?interruptIfStarted()?方法中會(huì)判斷?getState()>=0
????????????setState(-1);?
????????????this.firstTask?=?firstTask;

????????????//?根據(jù)線程池的?threadFactory?創(chuàng)建一個(gè)線程,將?worker?本身傳給線程(因?yàn)?worker?實(shí)現(xiàn)了?Runnable?接口)
????????????this.thread?=?getThreadFactory().newThread(this);
????????}

????????public?void?run()?{
????????????//?thread?啟動(dòng)后會(huì)調(diào)用此方法
????????????runWorker(this);
????????}

???????
????????//?1?代表被鎖住了,0?代表未鎖
????????protected?boolean?isHeldExclusively()?{
????????????return?getState()?!=?0;
????????}

????????//?嘗試獲取鎖
????????protected?boolean?tryAcquire(int?unused)?{
????????????//?從這里可以看出它是一個(gè)獨(dú)占鎖,因?yàn)楫?dāng)獲取鎖后,cas?設(shè)置?state?不可能成功,這里我們也能明白上文中將?state?設(shè)置為?-1?的作用,這種情況下永遠(yuǎn)不可能獲取得鎖,而?worker?要被中斷首先必須獲取鎖
????????????if?(compareAndSetState(0,?1))?{
????????????????setExclusiveOwnerThread(Thread.currentThread());
????????????????return?true;
????????????}
????????????return?false;
????????}

????????//?嘗試釋放鎖
????????protected?boolean?tryRelease(int?unused)?{
????????????setExclusiveOwnerThread(null);
????????????setState(0);
????????????return?true;
????????}????

????????public?void?lock()????????{?acquire(1);?}
????????public?boolean?tryLock()??{?return?tryAcquire(1);?}
????????public?void?unlock()??????{?release(1);?}
????????public?boolean?isLocked()?{?return?isHeldExclusively();?}
????????????
????????//?中斷線程,這個(gè)方法會(huì)被?shutdowNow?調(diào)用,從中可以看出?shutdownNow?要中斷線程不需要獲取鎖,也就是說(shuō)如果線程正在運(yùn)行,照樣會(huì)給你中斷掉,所以一般來(lái)說(shuō)我們不用?shutdowNow?來(lái)中斷線程,太粗暴了,中斷時(shí)線程很可能在執(zhí)行任務(wù),影響任務(wù)執(zhí)行
????????void?interruptIfStarted()?{
????????????Thread?t;
????????????//?中斷也是有條件的,必須是?state?>=?0?且?t?!=?null?且線程未被中斷
????????????//?如果?state?==?-1?,不執(zhí)行中斷,再次明白了為啥上文中?setState(-1)?的意義
????????????if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{
????????????????try?{
????????????????????t.interrupt();
????????????????}?catch?(SecurityException?ignore)?{
????????????????}
????????????}
????????}
????}

通過(guò)上文對(duì) Worker 類的分析,相信大家不難理解 將線程封裝為 worker 主要是為了更好地管理線程的中斷 這句話。

理解了 Worker 的意義,我們?cè)賮?lái)看 addWorker 的方法

private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
????retry:
????for?(;;)?{
????????int?c?=?ctl.get();

????????//?獲取線程池的狀態(tài)
????????int?rs?=?runStateOf(c);

????????//?如果線程池的狀態(tài)?>=?SHUTDOWN,即為?SHUTDOWN,STOP,TIDYING,TERMINATED?這四個(gè)狀態(tài),只有一種情況有可能創(chuàng)建線程,即線程狀態(tài)為?SHUTDOWN,?且隊(duì)列非空時(shí),firstTask?==?null?代表創(chuàng)建一個(gè)不接收新任務(wù)的線程(此線程會(huì)從?workQueue?中獲取任務(wù)再執(zhí)行),這種情況下創(chuàng)建線程是為了加速處理完?workQueue?中的任務(wù)
????????if?(rs?>=?SHUTDOWN?&&
????????????!?(rs?==?SHUTDOWN?&&
???????????????firstTask?==?null?&&
???????????????!?workQueue.isEmpty()))
????????????return?false;

????????for?(;;)?{
????????????//?獲取線程數(shù)
????????????int?wc?=?workerCountOf(c);
????????????//?如果超過(guò)了線程池的最大?CAPACITY(5?億多,基本不可能)
????????????//?或者?超過(guò)了?corePoolSize(core?為?true)?或者?maximumPoolSize(core?為?false)?時(shí)
????????????//?則返回?false
????????????if?(wc?>=?CAPACITY?||
????????????????wc?>=?(core???corePoolSize?:?maximumPoolSize))
????????????????return?false;
????????????//?否則?CAS?增加線程的數(shù)量,如果成功跳出雙重循環(huán)
????????????if?(compareAndIncrementWorkerCount(c))
????????????????break?retry;
????????????c?=?ctl.get();??//?Re-read?ctl

????????????//?如果線程運(yùn)行狀態(tài)發(fā)生變化,跳到外層循環(huán)繼續(xù)執(zhí)行
????????????if?(runStateOf(c)?!=?rs)
????????????????continue?retry;
????????????//?說(shuō)明是因?yàn)?CAS?增加線程數(shù)量失敗所致,繼續(xù)執(zhí)行?retry?的內(nèi)層循環(huán)
????????}
????}

????boolean?workerStarted?=?false;
????boolean?workerAdded?=?false;
????Worker?w?=?null;
????try?{
????????//?能執(zhí)行到這里,說(shuō)明滿足增加?worker?的條件了,所以創(chuàng)建?worker,準(zhǔn)備添加進(jìn)線程池中執(zhí)行任務(wù)
????????w?=?new?Worker(firstTask);
????????final?Thread?t?=?w.thread;
????????if?(t?!=?null)?{
????????????//?加鎖,是因?yàn)橄挛囊?w?添加進(jìn)?workers?中,?workers?是?HashSet,不是線程安全的,所以需要加鎖予以保證
????????????final?ReentrantLock?mainLock?=?this.mainLock;
????????????mainLock.lock();
????????????try?{
????????????????//??再次?check?線程池的狀態(tài)以防執(zhí)行到此步時(shí)發(fā)生中斷等
????????????????int?rs?=?runStateOf(ctl.get());
????????????????//?如果線程池狀態(tài)小于?SHUTDOWN(即為?RUNNING),
????????????????//?或者狀態(tài)為?SHUTDOWN?但?firstTask?==?null(代表不接收任務(wù),只是創(chuàng)建線程處理?workQueue?中的任務(wù)),則滿足添加?worker?的條件
????????????????if?(rs?????????????????????(rs?==?SHUTDOWN?&&?firstTask?==?null))?{
????????????????????????????????????????//?如果線程已啟動(dòng),顯然有問(wèn)題(因?yàn)閯?chuàng)建?worker?后,還沒(méi)啟動(dòng)線程呢),拋出異常
????????????????????if?(t.isAlive())?
????????????????????????throw?new?IllegalThreadStateException();
????????????????????workers.add(w);
????????????????????int?s?=?workers.size();

????????????????????//?記錄最大的線程池大小以作監(jiān)控之用
????????????????????if?(s?>?largestPoolSize)
????????????????????????largestPoolSize?=?s;
????????????????????workerAdded?=?true;
????????????????}
????????????}?finally?{
????????????????mainLock.unlock();
????????????}

????????????//?說(shuō)明往?workers?中添加?worker?成功,此時(shí)啟動(dòng)線程
????????????if?(workerAdded)?{
????????????????t.start();
????????????????workerStarted?=?true;
????????????}
????????}
????}?finally?{
????????//?添加線程失敗,執(zhí)行?addWorkerFailed?方法,主要做了將?worker?從?workers?中移除,減少線程數(shù),并嘗試著關(guān)閉線程池這樣的操作
????????if?(!?workerStarted)
????????????addWorkerFailed(w);
????}
????return?workerStarted;
}

從這段代碼我們可以看到多線程下情況的不可預(yù)料性,我們發(fā)現(xiàn)在滿足條件情況下,又對(duì)線程狀態(tài)重新進(jìn)行了 check,以防期間出現(xiàn)中斷等線程池狀態(tài)發(fā)生變更的操作,這也給我們以啟發(fā):多線程環(huán)境下的各種臨界條件一定要考慮到位。

執(zhí)行 addWorker 創(chuàng)建 worker 成功后,線程開始執(zhí)行了(t.start()),由于在創(chuàng)建 Worker 時(shí),將 Worker ?自己傳給了此線程,所以啟動(dòng)線程后,會(huì)調(diào)用 ?Worker 的 run 方法

public?void?run()?{
????runWorker(this);
}

可以看到最終會(huì)調(diào)用 ?runWorker 方法,接下來(lái)我們來(lái)分析下 runWorker 方法

final?void?runWorker(Worker?w)?{
????Thread?wt?=?Thread.currentThread();
????Runnable?task?=?w.firstTask;
????w.firstTask?=?null;
????//?unlock?會(huì)調(diào)用?tryRelease?方法將?state?設(shè)置成?0,代表允許中斷,允許中斷的條件上文我們?cè)?interruptIfStarted()?中有提過(guò),即?state?>=?0
????w.unlock();
????boolean?completedAbruptly?=?true;
????try?{
????????//?如果在提交任務(wù)時(shí)創(chuàng)建了線程,并把任務(wù)丟給此線程,則會(huì)先執(zhí)行此?task
????????//?否則從任務(wù)隊(duì)列中獲取?task?來(lái)執(zhí)行(即?getTask()?方法)
????????while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
????????????w.lock();
????????????
????????????//?如果線程池狀態(tài)為?>=?STOP(即?STOP,TIDYING,TERMINATED?)時(shí),則線程應(yīng)該中斷
????????????//?如果線程池狀態(tài)?
????????????if?((runStateAtLeast(ctl.get(),?STOP)?||
?????????????????(Thread.interrupted()?&&
??????????????????runStateAtLeast(ctl.get(),?STOP)))?&&
????????????????!wt.isInterrupted())
????????????????wt.interrupt();
????????????try?{
????????????????//?執(zhí)行任務(wù)前,子類可實(shí)現(xiàn)此鉤子方法作為統(tǒng)計(jì)之用
????????????????beforeExecute(wt,?task);
????????????????Throwable?thrown?=?null;
????????????????try?{
????????????????????task.run();
????????????????}?catch?(RuntimeException?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?catch?(Error?x)?{
????????????????????thrown?=?x;?throw?x;
????????????????}?catch?(Throwable?x)?{
????????????????????thrown?=?x;?throw?new?Error(x);
????????????????}?finally?{
????????????????????//?執(zhí)行任務(wù)后,子類可實(shí)現(xiàn)此鉤子方法作為統(tǒng)計(jì)之用
????????????????????afterExecute(task,?thrown);
????????????????}
????????????}?finally?{
????????????????task?=?null;
????????????????w.completedTasks++;
????????????????w.unlock();
????????????}
????????}
????????completedAbruptly?=?false;
????}?finally?{
????????//?如果執(zhí)行到這只有兩種可能,一種是執(zhí)行過(guò)程中異常中斷了,一種是隊(duì)列里沒(méi)有任務(wù)了,從這里可以看出線程沒(méi)有核心線程與非核心線程之分,哪個(gè)任務(wù)異常了或者正常退出了都會(huì)執(zhí)行此方法,此方法會(huì)根據(jù)情況將線程數(shù)-1
????????processWorkerExit(w,?completedAbruptly);
????}
}

來(lái)看看 processWorkerExit 方法是咋樣的

private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
????????//?如果異常退出,cas?執(zhí)行線程池減?1?操作
????if?(completedAbruptly)?
????????decrementWorkerCount();

????final?ReentrantLock?mainLock?=?this.mainLock;
????mainLock.lock();
????try?{
????????completedTaskCount?+=?w.completedTasks;
????????//?加鎖確保線程安全地移除?worker?
????????workers.remove(w);
????}?finally?{
????????mainLock.unlock();
????}

????//?woker?既然異常退出,可能線程池狀態(tài)變了(如執(zhí)行?shutdown?等),嘗試著關(guān)閉線程池
????tryTerminate();

????int?c?=?ctl.get();

????//??如果線程池處于?STOP?狀態(tài),則如果?woker?是異常退出的,重新新增一個(gè)?woker,如果是正常退出的,在?wokerQueue?為非空的條件下,確保至少有一個(gè)線程在運(yùn)行以執(zhí)行?wokerQueue?中的任務(wù)
????if?(runStateLessThan(c,?STOP))?{
????????if?(!completedAbruptly)?{
????????????int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;
????????????if?(min?==?0?&&?!?workQueue.isEmpty())
????????????????min?=?1;
????????????if?(workerCountOf(c)?>=?min)
????????????????return;?//?replacement?not?needed
????????}
????????addWorker(null,?false);
????}
}

接下來(lái)我們分析 woker 從 workQueue 中取任務(wù)的方法 getTask

private?Runnable?getTask()?{
????boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?

????for?(;;)?{
????????int?c?=?ctl.get();
????????int?rs?=?runStateOf(c);

????????//?如果線程池狀態(tài)至少為?STOP?或者
????????//?線程池狀態(tài)?==?SHUTDOWN?并且任務(wù)隊(duì)列是空的
????????//?則減少線程數(shù)量,返回?null,這種情況下上文分析的?runWorker?會(huì)執(zhí)行?processWorkerExit?從而讓獲取此?Task?的?woker?退出
????????if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
????????????decrementWorkerCount();
????????????return?null;
????????}

????????int?wc?=?workerCountOf(c);

????????//?如果?allowCoreThreadTimeOut?為?true,代表任何線程在?keepAliveTime?時(shí)間內(nèi)處于?idle?狀態(tài)都會(huì)被回收,如果線程數(shù)大于?corePoolSize,本身在?keepAliveTime?時(shí)間內(nèi)處于?idle?狀態(tài)就會(huì)被回收
????????boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;

????????//?worker?應(yīng)該被回收的幾個(gè)條件,這個(gè)比較簡(jiǎn)單,就此略過(guò)
????????if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))
????????????&&?(wc?>?1?||?workQueue.isEmpty()))?{
????????????if?(compareAndDecrementWorkerCount(c))
????????????????return?null;
????????????continue;
????????}

????????try?{
???????????//?阻塞獲取?task,如果在?keepAliveTime?時(shí)間內(nèi)未獲取任務(wù),說(shuō)明超時(shí)了,此時(shí)?timedOut?為?true
????????????Runnable?r?=?timed??
????????????????workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:
????????????????workQueue.take();
????????????if?(r?!=?null)
????????????????return?r;
????????????timedOut?=?true;
????????}?catch?(InterruptedException?retry)?{
????????????timedOut?=?false;
????????}
????}
}

經(jīng)過(guò)以上源碼剖析,相信我們對(duì)線程池的工作原理了解得八九不離十了,再來(lái)簡(jiǎn)單過(guò)一下其他一些比較有用的方法,開頭我們提到線程池的監(jiān)控問(wèn)題,我們看一下可以監(jiān)控哪些指標(biāo)

  • int getCorePoolSize():獲取核心線程數(shù)。
  • int getLargestPoolSize():歷史峰值線程數(shù)。
  • int getMaximumPoolSize():最大線程數(shù)(線程池線程容量)。
  • int getActiveCount():當(dāng)前活躍線程數(shù)
  • int getPoolSize():當(dāng)前線程池中的線程總數(shù)
  • BlockingQueue getQueue() 當(dāng)前線程池的任務(wù)隊(duì)列,據(jù)此可以獲取積壓任務(wù)的總數(shù),getQueue.size()

監(jiān)控思路也很簡(jiǎn)單,開啟一個(gè)定時(shí)線程 ScheduledThreadPoolExecutor,定期對(duì)這些線程池指標(biāo)進(jìn)行采集,一般會(huì)采用一些開源工具如 Grafana + Prometheus + MicroMeter 來(lái)實(shí)現(xiàn)。

如何實(shí)現(xiàn)核心線程池的預(yù)熱

使用 ?prestartAllCoreThreads() 方法,這個(gè)方法會(huì)一次性創(chuàng)建 corePoolSize 個(gè)線程,無(wú)需等到提交任務(wù)時(shí)才創(chuàng)建,提交創(chuàng)建好線程的話,一有任務(wù)提交過(guò)來(lái),這些線程就可以立即處理。

如何實(shí)現(xiàn)動(dòng)態(tài)調(diào)整線程池參數(shù)

  • setCorePoolSize(int corePoolSize) 調(diào)整核心線程池大小
  • setMaximumPoolSize(int maximumPoolSize)
  • setKeepAliveTime() 設(shè)置線程的存活時(shí)間

解答開篇的問(wèn)題

其它問(wèn)題基本都在源碼剖析環(huán)節(jié)回答了,這里簡(jiǎn)單說(shuō)下其他問(wèn)題

1、Tomcat 的線程池和 JDK 的線程池實(shí)現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實(shí)現(xiàn)嗎? Dubbo 中一個(gè)叫 EagerThreadPool 的東西,可以看看它的使用說(shuō)明

從注釋里可以看出,如果核心線程都處于 busy 狀態(tài),如果有新的請(qǐng)求進(jìn)來(lái),EagerThreadPool 會(huì)選擇先創(chuàng)建線程,而不是將其放入任務(wù)隊(duì)列中,這樣可以更快地響應(yīng)這些請(qǐng)求。

Tomcat 實(shí)現(xiàn)也是與此類似的,只不過(guò)稍微有所不同,當(dāng) Tomcat 啟動(dòng)時(shí),會(huì)先創(chuàng)建 minSpareThreads 個(gè)線程,如果經(jīng)過(guò)一段時(shí)間收到請(qǐng)求時(shí)這些線程都處于忙碌狀態(tài),每次都會(huì)以 minSpareThreads 的步長(zhǎng)創(chuàng)建線程,本質(zhì)上也是為了更快地響應(yīng)處理請(qǐng)求。具體的源碼可以看它的 ThreadPool 實(shí)現(xiàn),這里就不展開了。

2、我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過(guò)這樣的一個(gè)問(wèn)題:壓測(cè)時(shí)接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊(duì)列為 5000,你能從這個(gè)設(shè)置中發(fā)現(xiàn)出一些問(wèn)題并對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)嗎?這個(gè)參數(shù)明顯能看出問(wèn)題來(lái),首先任務(wù)隊(duì)列設(shè)置過(guò)大,任務(wù)達(dá)到核心線程后,如果再有請(qǐng)求進(jìn)來(lái)會(huì)先進(jìn)入任務(wù)隊(duì)列,隊(duì)列滿了之后才創(chuàng)建線程,創(chuàng)建線程也是需要不少開銷的,所以我們后來(lái)把核心線程設(shè)置成了與最大線程一樣,并且調(diào)用 prestartAllCoreThreads() 來(lái)預(yù)熱核心線程,就不用等請(qǐng)求來(lái)時(shí)再創(chuàng)建線程了。

線程池的幾個(gè)最佳實(shí)踐

1、線程池執(zhí)行的任務(wù)應(yīng)該是互相獨(dú)立的,如果互相依賴的話,可能導(dǎo)致死鎖,比如下面這樣的代碼

ExecutorService?pool?=?Executors
??.newSingleThreadExecutor();
pool.submit(()?->?{
??try?{
????String?qq=pool.submit(()->"QQ").get();
????System.out.println(qq);
??}?catch?(Exception?e)?{
??}
});

2、核心任務(wù)與非核心任務(wù)最好能用多個(gè)線程池隔離開來(lái)

曾經(jīng)我們業(yè)務(wù)上就出現(xiàn)這樣的一個(gè)故障:突然很多用戶反饋短信收不到了,排查才發(fā)現(xiàn)發(fā)短信是在一個(gè)線程池里,而另外的定時(shí)腳本也是用的這個(gè)線程池來(lái)執(zhí)行任務(wù),這個(gè)腳本一分鐘可能產(chǎn)生幾百上千條任務(wù),導(dǎo)致發(fā)短信的方法在線程池里基本沒(méi)機(jī)會(huì)執(zhí)行,后來(lái)我們用了兩個(gè)線程池把發(fā)短信和執(zhí)行腳本隔離開來(lái)解決了問(wèn)題。

3、添加線程池監(jiān)控,動(dòng)態(tài)設(shè)置線程池

如前文所述,線程池的各個(gè)參數(shù)很難一次性確定,既然難以確定,又要保證發(fā)現(xiàn)問(wèn)題后及時(shí)解決,我們就需要為線程池增加監(jiān)控,監(jiān)控隊(duì)列大小,線程數(shù)量等,我們可以設(shè)置 3 分鐘內(nèi)比如隊(duì)列任務(wù)一直都是滿了的話,就觸發(fā)告警,這樣可以提前預(yù)警,如果線上因?yàn)榫€程池參數(shù)設(shè)置不合理而觸發(fā)了降級(jí)等操作,可以通過(guò)動(dòng)態(tài)設(shè)置線程池的方式來(lái)實(shí)時(shí)修改核心線程數(shù),最大線程數(shù)等,將問(wèn)題及時(shí)修復(fù)。

總結(jié)

本文詳細(xì)剖析了線程池的工作原理,相信大家對(duì)其工作機(jī)制應(yīng)該有了較深入的了解,也對(duì)開頭的幾個(gè)問(wèn)題有了較清楚的認(rèn)識(shí),本質(zhì)上設(shè)置線程池的目的是為了利用有效的資源最大化性能,最小化風(fēng)險(xiǎn),同時(shí)線程池的使用本質(zhì)上是為了更好地為用戶服務(wù),據(jù)此也不難明白 Tomcat, Dubbo 要另起爐灶來(lái)設(shè)置自己的線程池了。

? 巨人的肩膀

  • https://dzone.com/articles/how-much-memory-does-a-java-thread-take
  • https://segmentfault.com/a/1190000021047279
  • https://www.cnblogs.com/trust-freedom/p/6681948.html
  • 深入理解線程池 https://tinyurl.com/y675j928
  • 有的線程它死了,于是它變成一道面試題 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQ
  • Java 并發(fā)編程實(shí)戰(zhàn)
  • Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
  • 線程池異常處理詳解,一文搞懂!https://www.cnblogs.com/ncy1/articles/11629933.html

推薦閱讀
點(diǎn)個(gè)外賣,我把「軟中斷」搞懂了
讀者問(wèn):小林怎么學(xué)操作系統(tǒng)和計(jì)算機(jī)網(wǎng)絡(luò)呀?
讀者問(wèn):小林你的 500 張圖是怎么畫的?
讀者問(wèn):小林你能分享做公眾號(hào)的經(jīng)驗(yàn)嗎?

免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或?qū)⒋呱龈蟮莫?dú)角獸公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

倫敦2024年8月29日 /美通社/ -- 英國(guó)汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時(shí)1.5...

關(guān)鍵字: 汽車 人工智能 智能驅(qū)動(dòng) BSP

北京2024年8月28日 /美通社/ -- 越來(lái)越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運(yùn)行,同時(shí)企業(yè)卻面臨越來(lái)越多業(yè)務(wù)中斷的風(fēng)險(xiǎn),如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報(bào)道,騰訊和網(wǎng)易近期正在縮減他們對(duì)日本游戲市場(chǎng)的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)開幕式在貴陽(yáng)舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語(yǔ)權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點(diǎn): 有效應(yīng)對(duì)環(huán)境變化,經(jīng)營(yíng)業(yè)績(jī)穩(wěn)中有升 落實(shí)提質(zhì)增效舉措,毛利潤(rùn)率延續(xù)升勢(shì) 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長(zhǎng) 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競(jìng)爭(zhēng)力 堅(jiān)持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競(jìng)爭(zhēng)優(yōu)勢(shì)...

關(guān)鍵字: 通信 BSP 電信運(yùn)營(yíng)商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺(tái)與中國(guó)電影電視技術(shù)學(xué)會(huì)聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會(huì)上宣布正式成立。 活動(dòng)現(xiàn)場(chǎng) NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長(zhǎng)三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會(huì)上,軟通動(dòng)力信息技術(shù)(集團(tuán))股份有限公司(以下簡(jiǎn)稱"軟通動(dòng)力")與長(zhǎng)三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉