十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂售后,網(wǎng)站問(wèn)題一站解決
線程池是一種多線程處理形式,處理過(guò)程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動(dòng)啟動(dòng)這些任務(wù)。線程池線程都是后臺(tái)線程。每個(gè)線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線程單元中。如果某個(gè)線程在托管代碼中空閑(如正在等待某個(gè)事件),則線程池將插入另一個(gè)輔助線程來(lái)使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊(duì)列中包含掛起的工作,則線程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線程但線程的數(shù)目永遠(yuǎn)不會(huì)超過(guò)大值。超過(guò)大值的線程可以排隊(duì),但他們要等到其他線程完成后才啟動(dòng)。
應(yīng)用程序可以有多個(gè)線程,這些線程在休眠狀態(tài)中需要耗費(fèi)大量時(shí)間來(lái)等待事件發(fā)生。其他線程可能進(jìn)入睡眠狀態(tài),并且僅定期被喚醒以輪循更改或更新?tīng)顟B(tài)信息,然后再次進(jìn)入休眠狀態(tài)。為了簡(jiǎn)化對(duì)這些線程的管理,.NET框架為每個(gè)進(jìn)程提供了一個(gè)線程池,一個(gè)線程池有若干個(gè)等待操作狀態(tài),當(dāng)一個(gè)等待操作完成時(shí),線程池中的輔助線程會(huì)執(zhí)行回調(diào)函數(shù)。線程池中的線程由系統(tǒng)管理,程序員不需要費(fèi)力于線程管理,可以集中精力處理應(yīng)用程序任務(wù)。
Executor 框架是一個(gè)根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務(wù)的框架,目的是提供一種將”任務(wù)提交”與”任務(wù)如何運(yùn)行”分離開(kāi)來(lái)的機(jī)制。
Executor 框架核心 API 如下:
Executor
- 運(yùn)行任務(wù)的簡(jiǎn)單接口。ExecutorService
- 擴(kuò)展了 Executor
接口。擴(kuò)展能力:ScheduledExecutorService
- 擴(kuò)展了 ExecutorService
接口。擴(kuò)展能力:支持定期執(zhí)行任務(wù)。AbstractExecutorService
- ExecutorService
接口的默認(rèn)實(shí)現(xiàn)。ThreadPoolExecutor
- Executor 框架最核心的類(lèi),它繼承了 AbstractExecutorService
類(lèi)。ScheduledThreadPoolExecutor
- ScheduledExecutorService
接口的實(shí)現(xiàn),一個(gè)可定時(shí)調(diào)度任務(wù)的線程池。Executors
- 可以通過(guò)調(diào)用 Executors
的靜態(tài)工廠方法來(lái)創(chuàng)建線程池并返回一個(gè) ExecutorService
對(duì)象。Executor
接口中只定義了一個(gè) execute
方法,用于接收一個(gè) Runnable
對(duì)象。
public interface Executor {
void execute(Runnable command);
}
ExecutorService
接口繼承了 Executor
接口,它還提供了 invokeAll
、invokeAny
、shutdown
、submit
等方法。
public interface ExecutorService extends Executor {
void shutdown();
List shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
Future submit(Callable task);
Future submit(Runnable task, T result);
Future> submit(Runnable task);
List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException;
List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException;
T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
從其支持的方法定義,不難看出:相比于 Executor
接口,ExecutorService
接口主要的擴(kuò)展是:
sumbit
、invokeAll
、invokeAny
方法中都支持傳入Callable
對(duì)象。shutdown
、shutdownNow
、isShutdown
等方法。ScheduledExecutorService
接口擴(kuò)展了 ExecutorService
接口。
它除了支持前面兩個(gè)接口的所有能力以外,還支持定時(shí)調(diào)度線程。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture> schedule(Runnable command,
long delay, TimeUnit unit);
public ScheduledFuture schedule(Callable callable,
long delay, TimeUnit unit);
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
其擴(kuò)展的接口提供以下能力:
schedule
方法可以在指定的延時(shí)后執(zhí)行一個(gè) Runnable
或者 Callable
任務(wù)。scheduleAtFixedRate
方法和 scheduleWithFixedDelay
方法可以按照指定時(shí)間間隔,定期執(zhí)行任務(wù)。java.uitl.concurrent.ThreadPoolExecutor
類(lèi)是 Executor
框架中最核心的類(lèi)。所以,本文將著重講述一下這個(gè)類(lèi)。
ThreadPoolExecutor
有以下重要字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
參數(shù)說(shuō)明:
ctl
- 用于控制線程池的運(yùn)行狀態(tài)和線程池中的有效線程數(shù)量。它包含兩部分的信息:runState
)workerCount
)ctl
使用了 Integer
類(lèi)型來(lái)保存,高 3 位保存 runState
,低 29 位保存 workerCount
。COUNT_BITS
就是 29,CAPACITY
就是 1 左移 29 位減 1(29 個(gè) 1),這個(gè)常量表示 workerCount
的上限值,大約是 5 億。RUNNING
- 運(yùn)行狀態(tài)。接受新任務(wù),并且也能處理阻塞隊(duì)列中的任務(wù)。SHUTDOWN
- 關(guān)閉狀態(tài)。不接受新任務(wù),但可以處理阻塞隊(duì)列中的任務(wù)。RUNNING
狀態(tài)時(shí),調(diào)用 shutdown
方法會(huì)使線程池進(jìn)入到該狀態(tài)。finalize
方法在執(zhí)行過(guò)程中也會(huì)調(diào)用 shutdown
方法進(jìn)入該狀態(tài)。STOP
- 停止?fàn)顟B(tài)。不接受新任務(wù),也不處理隊(duì)列中的任務(wù)。會(huì)中斷正在處理任務(wù)的線程。在線程池處于 RUNNING
或 SHUTDOWN
狀態(tài)時(shí),調(diào)用 shutdownNow
方法會(huì)使線程池進(jìn)入到該狀態(tài)。TIDYING
- 整理狀態(tài)。如果所有的任務(wù)都已終止了,workerCount
(有效線程數(shù)) 為 0,線程池進(jìn)入該狀態(tài)后會(huì)調(diào)用 terminated
方法進(jìn)入 TERMINATED
狀態(tài)。TERMINATED
- 已終止?fàn)顟B(tài)。在 terminated
方法執(zhí)行完后進(jìn)入該狀態(tài)。默認(rèn) terminated
方法中什么也沒(méi)有做。進(jìn)入 TERMINATED
的條件如下:RUNNING
狀態(tài);TIDYING
狀態(tài)或 TERMINATED
狀態(tài);SHUTDOWN
并且 workerQueue
為空;workerCount
為 0;TIDYING
狀態(tài)成功。ThreadPoolExecutor
有四個(gè)構(gòu)造方法,前三個(gè)都是基于第四個(gè)實(shí)現(xiàn)。第四個(gè)構(gòu)造方法定義如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
參數(shù)說(shuō)明:
corePoolSize
- 核心線程數(shù)量。當(dāng)有新任務(wù)通過(guò) execute
方法提交時(shí) ,線程池會(huì)執(zhí)行以下判斷:corePoolSize
,則創(chuàng)建新線程來(lái)處理任務(wù),即使線程池中的其他線程是空閑的。corePoolSize
且小于 maximumPoolSize
,則只有當(dāng) workQueue
滿時(shí)才創(chuàng)建新的線程去處理任務(wù);corePoolSize
和 maximumPoolSize
相同,則創(chuàng)建的線程池的大小是固定的。這時(shí)如果有新任務(wù)提交,若 workQueue
未滿,則將請(qǐng)求放入 workQueue
中,等待有空閑的線程去從 workQueue
中取任務(wù)并處理;maximumPoolSize
,這時(shí)如果 workQueue
已經(jīng)滿了,則使用 handler
所指定的策略來(lái)處理任務(wù);corePoolSize
=> workQueue
=> maximumPoolSize
。maximumPoolSize
- 大線程數(shù)量。keepAliveTime
:線程保持活動(dòng)的時(shí)間。corePoolSize
的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷(xiāo)毀,而是會(huì)等待,直到等待的時(shí)間超過(guò)了 keepAliveTime
。unit
- keepAliveTime
的時(shí)間單位。有 7 種取值。可選的單位有天(DAYS),小時(shí)(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。workQueue
- 等待執(zhí)行的任務(wù)隊(duì)列。用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。 可以選擇以下幾個(gè)阻塞隊(duì)列。ArrayBlockingQueue
- 有界阻塞隊(duì)列。LinkedBlockingQueue
- ***阻塞隊(duì)列。Integer.MAX_VALUE
。ArrayBlockingQueue
。LinkedBlockingQueue
意味著: maximumPoolSize
將不起作用,線程池能創(chuàng)建的大線程數(shù)為 corePoolSize
,因?yàn)槿蝿?wù)等待隊(duì)列是***隊(duì)列。Executors.newFixedThreadPool
使用了這個(gè)隊(duì)列。SynchronousQueue
- 不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來(lái)執(zhí)行新來(lái)的任務(wù)。LinkedBlockingQueue
。Executors.newCachedThreadPool
使用了這個(gè)隊(duì)列。PriorityBlockingQueue
- 具有優(yōu)先級(jí)的***阻塞隊(duì)列。threadFactory
- 線程工廠??梢酝ㄟ^(guò)線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置更有意義的名字。handler
- 飽和策略。它是 RejectedExecutionHandler
類(lèi)型的變量。當(dāng)隊(duì)列和線程池都滿了,說(shuō)明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。線程池支持以下策略:AbortPolicy
- 丟棄任務(wù)并拋出異常。這也是默認(rèn)策略。DiscardPolicy
- 丟棄任務(wù),但不拋出異常。DiscardOldestPolicy
- 丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)。CallerRunsPolicy
- 只用調(diào)用者所在的線程來(lái)運(yùn)行任務(wù)。RejectedExecutionHandler
接口來(lái)定制處理策略。如記錄日志或持久化不能處理的任務(wù)。默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。
提交任務(wù)可以使用 execute
方法,它是 ThreadPoolExecutor
的核心方法,通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。
execute
方法工作流程如下:
workerCount < corePoolSize
,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù);workerCount >= corePoolSize
,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到該阻塞隊(duì)列中;workerCount >= corePoolSize && workerCount < maximumPoolSize
,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù);workerCount >= maximumPoolSize
,并且線程池內(nèi)的阻塞隊(duì)列已滿,則根據(jù)拒絕策略來(lái)處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。在 ThreadPoolExecutor
類(lèi)中還有一些重要的方法:
submit
- 類(lèi)似于 execute
,但是針對(duì)的是有返回值的線程。submit
方法是在 ExecutorService
中聲明的方法,在 AbstractExecutorService
就已經(jīng)有了具體的實(shí)現(xiàn)。ThreadPoolExecutor
直接復(fù)用 AbstractExecutorService
的 submit
方法。shutdown
- 不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)。SHUTDOWN
狀態(tài);interruptIdleWorkers
方法請(qǐng)求中斷所有空閑的 worker;tryTerminate
嘗試結(jié)束線程池。shutdownNow
- 立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)。與 shutdown
方法類(lèi)似,不同的地方在于:STOP
;isShutdown
- 調(diào)用了 shutdown
或 shutdownNow
方法后,isShutdown
方法就會(huì)返回 true。isTerminaed
- 當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用 isTerminaed
方法會(huì)返回 true。setCorePoolSize
- 設(shè)置核心線程數(shù)大小。setMaximumPoolSize
- 設(shè)置大線程數(shù)大小。getTaskCount
- 線程池已經(jīng)執(zhí)行的和未執(zhí)行的任務(wù)總數(shù);getCompletedTaskCount
- 線程池已完成的任務(wù)數(shù)量,該值小于等于 taskCount
;getLargestPoolSize
- 線程池曾經(jīng)創(chuàng)建過(guò)的大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否滿過(guò),也就是達(dá)到了 maximumPoolSize
;getPoolSize
- 線程池當(dāng)前的線程數(shù)量;getActiveCount
- 當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量。public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("線程池中線程數(shù)目:%s,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:%s,已執(zhí)行玩別的任務(wù)數(shù)目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}
static class MyThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
}
}
JDK 的 Executors
類(lèi)中提供了幾種具有代表性的線程池,這些線程池 都是基于 ThreadPoolExecutor
的定制化實(shí)現(xiàn)。
在實(shí)際使用線程池的場(chǎng)景中,我們往往不是直接使用 ThreadPoolExecutor
,而是使用 JDK 中提供的具有代表性的線程池實(shí)例。
創(chuàng)建一個(gè)單線程的線程池。
只會(huì)創(chuàng)建唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。 如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來(lái)替代它 。
單工作線程大的特點(diǎn)是:可保證順序地執(zhí)行各個(gè)任務(wù)。
示例:
public class SingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
});
}
executorService.shutdown();
}
}
創(chuàng)建一個(gè)固定大小的線程池。
每次提交一個(gè)任務(wù)就會(huì)新創(chuàng)建一個(gè)工作線程,如果工作線程數(shù)量達(dá)到線程池大線程數(shù),則將提交的任務(wù)存入到阻塞隊(duì)列中。
FixedThreadPool
是一個(gè)典型且優(yōu)秀的線程池,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時(shí)所耗的開(kāi)銷(xiāo)的優(yōu)點(diǎn)。但是,在線程池空閑時(shí),即線程池中沒(méi)有可運(yùn)行任務(wù)時(shí),它不會(huì)釋放工作線程,還會(huì)占用一定的系統(tǒng)資源。
示例:
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
});
}
executorService.shutdown();
}
}
創(chuàng)建一個(gè)可緩存的線程池。
CachedThreadPool
時(shí),一定要注意控制任務(wù)的數(shù)量,否則,由于大量線程同時(shí)運(yùn)行,很有會(huì)造成系統(tǒng)癱瘓。示例:
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
});
}
executorService.shutdown();
}
}
創(chuàng)建一個(gè)大小無(wú)限的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
schedule();
scheduleAtFixedRate();
}
private static void schedule() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
}, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 執(zhí)行");
}
}, 1, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
}
創(chuàng)新互聯(lián)www.cdcxhl.cn,專(zhuān)業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開(kāi)啟,新人活動(dòng)云服務(wù)器買(mǎi)多久送多久。