十年網(wǎng)站開發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營維護(hù)+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
這篇文章主要講解了“Java線程池的原理和作用是什么”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Java線程池的原理和作用是什么”吧!
十載的公安網(wǎng)站建設(shè)經(jīng)驗(yàn),針對設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。網(wǎng)絡(luò)營銷推廣的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整公安建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)建站從事“公安網(wǎng)站設(shè)計(jì)”,“公安網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
“首當(dāng)其沖的問題就是,如何復(fù)用線程?!?/p>
/**
* Created by Anur IjuoKaruKas on 2019/7/16
*/
public class ThreadPoolExecutor {
private final BlockingQueue workQueue = new LinkedBlockingQueue<>();
private final Runnable runnable = () -> {
try {
while (true) {
Runnable take = workQueue.poll();
if (take == null) {
Thread.sleep(200);
} else {
take.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
public ThreadPoolExecutor() {
new Thread(runnable).start();
}
public void execute(Runnable command) {
workQueue.offer(command);
}
} 肥宅埋過了一眼,很快就發(fā)現(xiàn)其中玄妙之處:在小奈的 ThreadPoolExecutor 中,定制了一套 runnable 流程,負(fù)責(zé)不斷從 workQueue 這個(gè)隊(duì)列中拉取由 #execute 方法提交過來的任務(wù),并執(zhí)行其 run() 方法。這樣,無論提交過來多少個(gè)任務(wù),始終都是這個(gè)線程池內(nèi)置的線程在執(zhí)行任務(wù)。當(dāng)獲取不到任務(wù)的時(shí)候,線程池會(huì)自己進(jìn)入休眠狀態(tài)。
“雖然這達(dá)到了線程復(fù)用,但是你的這個(gè)線程完全沒辦法自動(dòng)創(chuàng)建和銷毀啊?甚至它的線程池?cái)?shù)量都是不可控制的?!狈收耠m然感嘆于對方可以這么快實(shí)現(xiàn)線程復(fù)用,但還是持續(xù)展開攻勢。
“既然要實(shí)現(xiàn)線程池可控,最直截了當(dāng)?shù)南敕ū闶菍⒎讲诺哪翘?runnable 流程封裝成一個(gè)對象,我們只需控制這個(gè)對象的創(chuàng)建、銷毀、以及復(fù)用即可。”作為一只長期浸泡在 OOP 思維中的程序媛,這種問題難不倒小奈。她很快就寫出了一個(gè)內(nèi)部類,叫做 Worker,其中 #runWorker(this); 就是剛才那個(gè) runnable 流程,負(fù)責(zé)不斷從隊(duì)列中獲取任務(wù),并調(diào)用它的 #run() 方法。
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = threadFactory.newThread(this);
}
@Override
public void run() {
runWorker(this);
}
}小奈為后續(xù)將要完成的 worker 線程數(shù)量控制打下了基石:ThreadPoolExecutor 中增加了一個(gè)散列集,用于存放 worker,增加了一個(gè) ThreadFactory,供使用者定制化 worker 線程的創(chuàng)建。
其中比較核心的方法叫做 #addWorker(),負(fù)責(zé)創(chuàng)建并初始化 worker 線程,并將其納入散列集中管理。當(dāng)然,這個(gè)線程池還無法自動(dòng)創(chuàng)建,不過已經(jīng)可以自動(dòng)銷毀了。可以看到,在拉取不到任務(wù)時(shí),#getTask() 則返回空,會(huì)跳出 #runWorker() 的 while 循環(huán),之后調(diào)用 #processWorkerExit();,將 worker 線程從散列集中移除。
/**
* Created by Anur IjuoKaruKas on 2019/7/16
*/
public class ThreadPoolExecutor {
private final HashSet workers = new HashSet<>();
private volatile ThreadFactory threadFactory;
private final BlockingQueue workQueue;
public ThreadPoolExecutor(BlockingQueue workQueue, ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.workQueue = workQueue;
}
public void execute(Runnable command) {
workQueue.offer(command);
}
/**
* 新建一個(gè) worker 線程、啟動(dòng)并納入 workers
*/
private boolean addWorker(Runnable firstTask) {
Worker w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
workers.add(w);
t.start();
}
return true;
}
/**
* worker 線程池不斷從 workQueue 中拉取 task 進(jìn)行消費(fèi)
*/
private void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
task.run();
}
processWorkerExit(w);
}
/**
* 當(dāng)線程執(zhí)行完畢之前,將其從 workers 中移除
*/
private void processWorkerExit(Worker w) {
workers.remove(w);
}
private Runnable getTask() {
return workQueue.poll();
}
} 看到這里,肥宅埋已經(jīng)能預(yù)測到接下來的思路了。

線程池需要加入一個(gè)變量 maximumPoolSize,以防無限創(chuàng)建線程,每次進(jìn)行 #addWorker() 時(shí),需要判斷一下是否可以繼續(xù)添加 worker,如果可以,則添加新的 worker,否則將任務(wù)丟入隊(duì)列:
#addWorker() 中加入拒絕的邏輯,確保不能無限創(chuàng)建 worker。
再修改一下 #execute() 方法,優(yōu)先創(chuàng)建 worker,如果創(chuàng)建 worker 失敗( workers.size() >= maximumPoolSize),則直接將任務(wù)丟入隊(duì)列。
public void execute(Runnable command) {
if (addWorker(command)) {
return;
}
workQueue.offer(command);
}
/**
* 新建一個(gè) worker 線程、啟動(dòng)并納入 workers
*/
private boolean addWorker(Runnable firstTask) {
int ws = workers.size();
if (ws >= maximumPoolSize) {
return false;
}
Worker w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
workers.add(w);
t.start();
}
return true;
}已經(jīng)寫到這里小奈可謂是趾高氣揚(yáng),仿佛實(shí)現(xiàn)一個(gè)線程池已經(jīng)不在話下。

“這樣貌似有點(diǎn)問題???雖然說你已經(jīng)實(shí)現(xiàn)了線程的動(dòng)態(tài)創(chuàng)建與銷毀,但在任務(wù)沒有那么緊湊的情況下,基本是每個(gè)任務(wù)都進(jìn)來都需要?jiǎng)?chuàng)建一次線程,再銷毀一次線程,說好的復(fù)用到哪里去了?”肥宅埋給了膨脹的小奈當(dāng)頭一棒。
“咳咳......嘛,銷毀的時(shí)候做一下判斷就可以了,我們加入一個(gè)新的變量,叫做 keepAliveTime,當(dāng)拿不到任務(wù)的時(shí)候,就進(jìn)行阻塞休眠,比如 20ms,每次對 keepAliveTime 減 20ms,直到小于等于 0 ,再銷毀線程。”小奈反應(yīng)迅速,很快給出了答案,并準(zhǔn)備動(dòng)手對線程池進(jìn)行改動(dòng)。
肥宅埋嘆了一口氣,“我看你是被膨脹蒙蔽了雙眼,既然我們已經(jīng)使用了阻塞隊(duì)列,那么就可以充分利用阻塞隊(duì)列的特性!阻塞隊(duì)列中內(nèi)置了一個(gè)顯式鎖,利用鎖的 condition 對象,使用它的 #awaitNanos() 與 #notify() 方法,就可以直接精準(zhǔn)地實(shí)現(xiàn)線程調(diào)度了。”畢竟肥宅埋也是一只學(xué)霸,聽到小奈的想法后提出了更具有建設(shè)性的設(shè)計(jì)。
小奈也很快反應(yīng)過來,阻塞隊(duì)列有一個(gè) #poll() 方法,底層是借助 condition 對象封裝的 LockSupport.parkNanos(this, nanosTimeout); 來實(shí)現(xiàn)的,會(huì)阻塞直到有新的元素加入,當(dāng)有新的元素加入,這個(gè) condition 就會(huì)被喚醒,來實(shí)現(xiàn) 當(dāng)調(diào)用阻塞隊(duì)列的 #poll() 時(shí),如果阻塞隊(duì)列為空,會(huì)進(jìn)行一段時(shí)間的休眠,直到被喚醒,或者休眠超時(shí)。
肥宅埋一手接管了改造線程池的大權(quán),馬上大刀闊斧地改了起來。
改動(dòng)十分簡單,原先的 #getTask() 是直接調(diào)用阻塞隊(duì)列的 #take() 方法,如果隊(duì)列為空,則直接返回,只要將其改為 #poll 方法即可。
/**
* 當(dāng) runWorker 一定時(shí)間內(nèi)獲取不到任務(wù)時(shí),就會(huì) processWorkerExit 銷毀
*/
private Runnable getTask() {
boolean timedOut = false;
while (true) {
try {
if (timedOut) {
return null;
}
Runnable r = workQueue.poll(keepAliveTime, unit);
if (r != null) {
return r;
} else {
timedOut = true;
}
} catch (InterruptedException e) {
timedOut = false;
}
}
}“一般來說,我們的任務(wù)提交都不會(huì)太過于均勻,如果我們平常不需要那么多線程來消費(fèi),但又想避免任務(wù)一直被堆積導(dǎo)致某些任務(wù)遲遲不被消費(fèi),就需要引入**核心線程 corePoolSize ** 與 **最大線程 maximumPoolSize ** 的概念。”肥宅埋想到了一個(gè)簡單的可以優(yōu)化的點(diǎn),頭頭是道地分析道:“我們可以不用做那么復(fù)雜的動(dòng)態(tài) worker 消費(fèi)池,最簡單的,如果我們的阻塞隊(duì)列滿了,就繼續(xù)創(chuàng)建更多的線程池,這樣,堆積的任務(wù)能比以往更快速的降下來?!?/p>
說起來好像復(fù)雜,實(shí)際上代碼十分簡單。小奈看見肥宅埋修改了 #addWorker() 方法,增加了一個(gè)參數(shù) core,其作用只有一個(gè),如果是核心線程,則創(chuàng)建時(shí),數(shù)量必須小于等于 corePoolSize,否則數(shù)量必須小于等于 maximumPoolSize。
另外, #execute() 方法的改動(dòng)也十分簡單,前面的改動(dòng)不大,主要是,當(dāng)任務(wù) #offer() 失敗后,創(chuàng)建非核心 worker 線程。
/**
* 優(yōu)先創(chuàng)建核心線程,核心線程滿了以后,則優(yōu)先將任務(wù)放入隊(duì)列
*
* 隊(duì)列滿了以后,則啟用非核心線程池,以防任務(wù)堆積
*/
public void execute(Runnable command) {
if (getPoolSize() < corePoolSize) {
if (addWorker(command, true)) {
return;
}
}
if (!workQueue.offer(command)) {
addWorker(command, false);
}
}
/**
* 新建一個(gè) worker 線程、啟動(dòng)并納入 workers
*/
private boolean addWorker(Runnable firstTask, boolean core) {
int ws = workers.size();
if (ws >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
Worker w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
workers.add(w);
t.start();
}
return true;
}“現(xiàn)在這個(gè)版本的線程池看起來真是有模有樣呢 ~ 可以動(dòng)態(tài)創(chuàng)建與銷毀線程,線程也能復(fù)用,還可以動(dòng)態(tài)增加更多的線程來消費(fèi)堆積的線程!” 肥宅埋滿意地看著兩人的杰作,“其實(shí)我還發(fā)現(xiàn)有個(gè)地方不太友好,在推送任務(wù)時(shí),調(diào)用方可能并不知道自己的任務(wù)是否失敗?!?/p>
“這個(gè)簡單鴨,只需要在調(diào)用 #execute() 時(shí)返回 flase 來代表添加失敗,或者拋出對應(yīng)的異常即可?!毙∧谓o出了很直觀的設(shè)計(jì)。
“這確實(shí)不失為一個(gè)好方法,但是對于調(diào)用方來說,如果所有使用線程池的地方都需要去做這個(gè)判斷,那豈不是太麻煩了!”肥宅埋對方案進(jìn)行了補(bǔ)充:“這個(gè)是面向切面編程的一種思想,我們可以提供一個(gè)如何處理這些隊(duì)列已經(jīng)放不下,且無法創(chuàng)建更多消費(fèi)線程的切面入口,就叫它 AbortPolicy 吧!”
肥宅埋修改了一下 #execute() 方法,如果在創(chuàng)建非核心線程池的時(shí)候失敗,就直接將任務(wù)拒絕掉。
/**
* 優(yōu)先創(chuàng)建核心線程,核心線程滿了以后,則優(yōu)先將任務(wù)放入隊(duì)列
*
* 隊(duì)列滿了以后,則啟用非核心線程池,以防任務(wù)堆積
*
* 如果非核心線程池創(chuàng)建失敗,則拒絕這個(gè)任務(wù)
*/
public void execute(Runnable command) {
if (getPoolSize() < corePoolSize) {
if (addWorker(command, true)) {
return;
}
}
if (!workQueue.offer(command)) {
if (!addWorker(command, false)) {
reject(command);
}
}
}如何去拒絕任務(wù),交給調(diào)用者去實(shí)現(xiàn),#reject() 的實(shí)現(xiàn)非常簡單,就是調(diào)用一下 BiConsumer,這個(gè)可以供調(diào)用方自由定制。
private void reject(Runnable command) {
abortPolicy.accept(command, this);
}小奈與肥宅埋已經(jīng)完成了她們的線程池,現(xiàn)在需要測試一下線程池是否可以正常使用,比較細(xì)心的肥宅埋寫了測試用例如下:
核心線程數(shù)為5,最大線程數(shù)為10,緊接著每個(gè)線程在拉取不到任務(wù)時(shí)會(huì)存活一分鐘,有一個(gè)長度為 5 的并發(fā)阻塞隊(duì)列,采用默認(rèn)的 ThreadFactory,最后,使用了 DiscardPolicy,當(dāng)任務(wù)被拒絕后,直接丟棄任務(wù),并打印日志。

她們運(yùn)行了代碼,日志打印如下。完全符合預(yù)期,在阻塞隊(duì)列還未裝滿之前,只有 5 個(gè)核心線程在消費(fèi)任務(wù),當(dāng)阻塞隊(duì)列滿了以后,會(huì)逐步創(chuàng)建更多的線程,而當(dāng)無法創(chuàng)建更多線程后,則觸發(fā)丟棄策略。

感謝各位的閱讀,以上就是“Java線程池的原理和作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Java線程池的原理和作用是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!