高并發(fā)之——創(chuàng)建線程池居然有這么多方式...
來自:冰河技術(shù)
前言
在Java的高并發(fā)領(lǐng)域,線程池一直是一個繞不開的話題。有些童鞋一直在使用線程池,但是,對于如何創(chuàng)建線程池僅僅停留在使用Executors工具類的方式,那么,創(chuàng)建線程池究竟存在哪幾種方式呢?就讓我們一起從創(chuàng)建線程池的源碼來深入分析究竟有哪些方式可以創(chuàng)建線程池。
使用Executors工具類創(chuàng)建線程池
在創(chuàng)建線程池時,初學(xué)者用的最多的就是Executors 這個工具類,而使用這個工具類創(chuàng)建線程池時非常簡單的,不需要關(guān)注太多的線程池細節(jié),只需要傳入必要的參數(shù)即可。Executors 工具類提供了幾種創(chuàng)建線程池的方法,如下所示。
Executors.newCachedThreadPool:創(chuàng)建一個可緩存的線程池,如果線程池的大小超過了需要,可以靈活回收空閑線程,如果沒有可回收線程,則新建線程
Executors.newFixedThreadPool:創(chuàng)建一個定長的線程池,可以控制線程的最大并發(fā)數(shù),超出的線程會在隊列中等待
Executors.newScheduledThreadPool:創(chuàng)建一個定長的線程池,支持定時、周期性的任務(wù)執(zhí)行
Executors.newSingleThreadExecutor: 創(chuàng)建一個單線程化的線程池,使用一個唯一的工作線程執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(先入先出或者優(yōu)先級)執(zhí)行
Executors.newSingleThreadScheduledExecutor:創(chuàng)建一個單線程化的線程池,支持定時、周期性的任務(wù)執(zhí)行
Executors.newWorkStealingPool:創(chuàng)建一個具有并行級別的work-stealing線程池
其中,Executors.newWorkStealingPool方法是Java 8中新增的創(chuàng)建線程池的方法,它能夠為線程池設(shè)置并行級別,具有更高的并發(fā)度和性能。除了此方法外,其他創(chuàng)建線程池的方法本質(zhì)上調(diào)用的是ThreadPoolExecutor類的構(gòu)造方法。
例如,我們可以使用如下代碼創(chuàng)建線程池。
Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);
使用ThreadPoolExecutor類創(chuàng)建線程池
從代碼結(jié)構(gòu)上看ThreadPoolExecutor類繼承自AbstractExecutorService,也就是說,ThreadPoolExecutor類具有AbstractExecutorService類的全部功能。
既然Executors工具類中創(chuàng)建線程池大部分調(diào)用的都是ThreadPoolExecutor類的構(gòu)造方法,所以,我們也可以直接調(diào)用ThreadPoolExecutor類的構(gòu)造方法來創(chuàng)建線程池,而不再使用Executors工具類。接下來,我們一起看下ThreadPoolExecutor類的構(gòu)造方法。
ThreadPoolExecutor類中的所有構(gòu)造方法如下所示。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
由ThreadPoolExecutor類的構(gòu)造方法的源代碼可知,創(chuàng)建線程池最終調(diào)用的構(gòu)造方法如下。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
關(guān)于此構(gòu)造方法中各參數(shù)的含義和作用,各位可以移步《高并發(fā)之——不得不說的線程池與ThreadPoolExecutor類淺析》進行查閱。
大家可以自行調(diào)用ThreadPoolExecutor類的構(gòu)造方法來創(chuàng)建線程池。例如,我們可以使用如下形式創(chuàng)建線程池。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
使用ForkJoinPool類創(chuàng)建線程池
在Java8的Executors工具類中,新增了如下創(chuàng)建線程池的方式。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
從源代碼可以可以,本質(zhì)上調(diào)用的是ForkJoinPool類的構(gòu)造方法類創(chuàng)建線程池,而從代碼結(jié)構(gòu)上來看ForkJoinPool類繼承自AbstractExecutorService抽象類。接下來,我們看下ForkJoinPool類的構(gòu)造方法。
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
通過查看源代碼得知,F(xiàn)orkJoinPool的構(gòu)造方法,最終調(diào)用的是如下私有構(gòu)造方法。
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
其中,各參數(shù)的含義如下所示。
parallelism:并發(fā)級別。
factory:創(chuàng)建線程的工廠類對象。
handler:當線程池中的線程拋出未捕獲的異常時,統(tǒng)一使用UncaughtExceptionHandler對象處理。
mode:取值為FIFO_QUEUE或者LIFO_QUEUE。
workerNamePrefix:執(zhí)行任務(wù)的線程名稱的前綴。
當然,私有構(gòu)造方法雖然是參數(shù)最多的一個方法,但是其不會直接對外方法,我們可以使用如下方式創(chuàng)建線程池。
new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
使用ScheduledThreadPoolExecutor類創(chuàng)建線程池
在Executors工具類中存在如下方法類創(chuàng)建線程池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
從源碼來看,這幾個方法本質(zhì)上調(diào)用的都是ScheduledThreadPoolExecutor類的構(gòu)造方法,ScheduledThreadPoolExecutor中存在的構(gòu)造方法如下所示。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
而從代碼結(jié)構(gòu)上看,ScheduledThreadPoolExecutor類繼承自ThreadPoolExecutor類,本質(zhì)上還是調(diào)用ThreadPoolExecutor類的構(gòu)造方法,只不過此時傳遞的隊列為DelayedWorkQueue。我們可以直接調(diào)用ScheduledThreadPoolExecutor類的構(gòu)造方法來創(chuàng)建線程池,例如以如下形式創(chuàng)建線程池。
new ScheduledThreadPoolExecutor(3)
最后,需要注意的是:ScheduledThreadPoolExecutor主要用來創(chuàng)建執(zhí)行定時任務(wù)的線程池。
后記:
記?。?/span>你比別人強的地方,不是你做過多少年的CRUD工作,而是你比別人掌握了更多深入的技能。不要總停留在CRUD的表面工作,理解并掌握底層原理并熟悉源碼實現(xiàn),并形成自己的抽象思維能力,做到靈活運用,才是你突破瓶頸,脫穎而出的重要方向!
最后,作為一名合格(發(fā)際線比較高)的開發(fā)人員或者資深(禿頂)的工程師和架構(gòu)師來說,理解原理和掌握源碼,并形成自己的抽象思維能力,靈活運用是你必須掌握的技能。
特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
免責聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!