Java如何優(yōu)雅關(guān)閉異步中的ExecutorService
1.ExecutorService的核心價(jià)值
在并發(fā)編程領(lǐng)域,Java的ExecutorService(位于java.util.concurrent包)是線程池管理的關(guān)鍵接口。作為Executor框架的核心組件,它通過(guò)解耦任務(wù)提交與執(zhí)行機(jī)制,為開(kāi)發(fā)者提供了:
- 線程生命周期管理自動(dòng)化
- 任務(wù)隊(duì)列智能調(diào)度
- 資源復(fù)用優(yōu)化機(jī)制
- 異步執(zhí)行結(jié)果追蹤能力
2.關(guān)閉機(jī)制的必要性
不正確的線程池關(guān)閉會(huì)導(dǎo)致:
- 內(nèi)存泄漏(滯留線程無(wú)法回收)
- 應(yīng)用無(wú)法正常終止(非守護(hù)線程保持活躍)
- 任務(wù)狀態(tài)不一致(突然中斷導(dǎo)致數(shù)據(jù)問(wèn)題)
- 系統(tǒng)資源耗盡(無(wú)限制線程創(chuàng)建)
3.shutdown()方法詳解
3.1 方法特性
void shutdown()
狀態(tài)轉(zhuǎn)換
將線程池狀態(tài)設(shè)置為SHUTDOWN,觸發(fā)以下行為:
- 拒絕新任務(wù)提交(觸發(fā)RejectedExecutionHandler)
- 繼續(xù)執(zhí)行已存在的任務(wù):
- 正在執(zhí)行的任務(wù)(running tasks)
- 等待隊(duì)列中的任務(wù)(queued tasks)
典型應(yīng)用場(chǎng)景
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交多個(gè)任務(wù)...
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("仍有任務(wù)未在時(shí)限內(nèi)完成");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
3.2 內(nèi)部運(yùn)作機(jī)制
- 原子性狀態(tài)更新:CAS操作修改線程池控制狀態(tài)
- 中斷空閑線程:僅中斷等待任務(wù)的Worker線程
- 隊(duì)列消費(fèi)保證:完全處理BlockingQueue中的剩余任務(wù)
4.shutdownNow()方法剖析
4.1 方法定義
List<Runnable> shutdownNow()
狀態(tài)轉(zhuǎn)換
將線程池狀態(tài)設(shè)置為STOP,觸發(fā):
- 立即拒絕新任務(wù)
- 中斷所有工作線程(無(wú)論是否在執(zhí)行任務(wù))
- 清空任務(wù)隊(duì)列,返回未執(zhí)行任務(wù)列表
4.2 中斷處理要點(diǎn)
executor.shutdownNow();
// 典型返回值處理
List<Runnable> unprocessed = executor.shutdownNow();
if (!unprocessed.isEmpty()) {
logger.warn("丟棄{}個(gè)未執(zhí)行任務(wù)", unprocessed.size());
}
任務(wù)中斷條件
只有當(dāng)任務(wù)代碼正確處理中斷時(shí)才能被終止:
class InterruptibleTask implements Runnable {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
// 執(zhí)行可中斷的操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重置中斷狀態(tài)
break;
}
}
}
}
5.對(duì)比分析
| 特性 | shutdown() | shutdownNow() |
|---|---|---|
| 新任務(wù)接受 | 立即拒絕 | 立即拒絕 |
| 運(yùn)行中任務(wù)處理 | 等待完成 | 嘗試中斷 |
| 隊(duì)列任務(wù)處理 | 全部執(zhí)行 | 清除并返回 |
| 返回值 | void | 未執(zhí)行任務(wù)列表 |
| 適用場(chǎng)景 | 優(yōu)雅關(guān)閉 | 緊急終止 |
| 線程中斷策略 | 僅中斷空閑線程 | 強(qiáng)制中斷所有線程 |
6.最佳實(shí)踐代碼示例
6.1 標(biāo)準(zhǔn)關(guān)閉模板
public class GracefulShutdownExample {
// 定義超時(shí)時(shí)間和時(shí)間單位(30秒)
private static final int TIMEOUT = 30;
private static final TimeUnit UNIT = TimeUnit.SECONDS;
// 執(zhí)行任務(wù)的方法,接收一個(gè)任務(wù)列表并將其提交給線程池執(zhí)行
public void executeTasks(List<Runnable> tasks) {
// 創(chuàng)建一個(gè)固定大小的線程池,大小為系統(tǒng)可用處理器核心數(shù)
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
// 將任務(wù)列表中的每個(gè)任務(wù)提交到線程池
tasks.forEach(executor::submit);
} finally {
// 在所有任務(wù)提交完后,禁用線程池接收新任務(wù),開(kāi)始優(yōu)雅關(guān)閉線程池
executor.shutdown(); // 禁止再提交新任務(wù)
try {
// 等待線程池中的任務(wù)在指定超時(shí)內(nèi)完成,如果超時(shí)未完成,則強(qiáng)制關(guān)閉線程池
if (!executor.awaitTermination(TIMEOUT, UNIT)) {
// 如果未能在超時(shí)內(nèi)完成,則調(diào)用 shutdownNow() 強(qiáng)制終止所有活動(dòng)任務(wù)
List<Runnable> unfinished = executor.shutdownNow();
// 處理未完成的任務(wù),例如記錄日志或重新提交
handleUnfinishedTasks(unfinished);
}
} catch (InterruptedException e) {
// 如果在等待終止時(shí)被中斷,恢復(fù)中斷狀態(tài)并強(qiáng)制關(guān)閉線程池
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
}
// 處理未完成任務(wù)的方法,這里我們打印未完成任務(wù)的數(shù)量
private void handleUnfinishedTasks(List<Runnable> tasks) {
// 如果有未完成的任務(wù),打印任務(wù)數(shù)量并執(zhí)行額外的處理
if (!tasks.isEmpty()) {
System.out.println("未完成任務(wù)數(shù): " + tasks.size());
// 可在此處記錄日志、重新排隊(duì)未完成的任務(wù)等
}
}
}構(gòu)造線程池: Executors.newFixedThreadPool() 創(chuàng)建一個(gè)固定大小的線程池,大小為系統(tǒng)可用的處理器核心數(shù),這樣可以更高效地利用 CPU 資源。
提交任務(wù): 使用 tasks.forEach(executor::submit) 提交每個(gè)任務(wù)到線程池中執(zhí)行。
優(yōu)雅關(guān)閉線程池:
executor.shutdown()禁用線程池接收新任務(wù),但仍會(huì)執(zhí)行已經(jīng)提交的任務(wù)。awaitTermination()方法用于等待所有任務(wù)執(zhí)行完成。如果超時(shí)后任務(wù)未完成,則調(diào)用shutdownNow()強(qiáng)制關(guān)閉線程池,停止所有正在運(yùn)行的任務(wù),并返回未完成的任務(wù)。
處理中斷: 如果在等待終止過(guò)程中發(fā)生 InterruptedException,線程會(huì)恢復(fù)中斷狀態(tài),并且強(qiáng)制關(guān)閉線程池。
處理未完成任務(wù): handleUnfinishedTasks() 方法會(huì)處理未完成的任務(wù),比如記錄日志或者重新排隊(duì)未完成的任務(wù)。
6.2 帶回調(diào)的增強(qiáng)實(shí)現(xiàn)
public class EnhancedExecutorManager {
// 定義線程池對(duì)象
private final ExecutorService executor;
// 定義超時(shí)時(shí)間及單位
private final long timeout;
private final TimeUnit unit;
// 構(gòu)造函數(shù),初始化線程池并設(shè)置超時(shí)時(shí)間和單位
public EnhancedExecutorManager(int corePoolSize, long timeout, TimeUnit unit) {
// 創(chuàng)建一個(gè)核心池大小為 corePoolSize,最大池大小為 corePoolSize * 2,最大空閑時(shí)間 60秒的線程池
this.executor = new ThreadPoolExecutor(
corePoolSize, // 核心線程池大小
corePoolSize * 2, // 最大線程池大小
60L, TimeUnit.SECONDS, // 空閑線程的存活時(shí)間
new LinkedBlockingQueue<>(1000), // 使用容量為 1000 的隊(duì)列來(lái)緩存任務(wù)
new CustomThreadFactory(), // 自定義線程工廠
new ThreadPoolExecutor.CallerRunsPolicy() // 當(dāng)任務(wù)無(wú)法提交時(shí),調(diào)用者線程執(zhí)行該任務(wù)
);
this.timeout = timeout; // 設(shè)置超時(shí)時(shí)間
this.unit = unit; // 設(shè)置超時(shí)時(shí)間單位
}
// 優(yōu)雅關(guān)閉線程池的方法
public void shutdown() {
executor.shutdown(); // 首先嘗試正常關(guān)閉線程池,不再接收新的任務(wù)
try {
// 如果線程池未能在指定的超時(shí)時(shí)間內(nèi)終止,則強(qiáng)制關(guān)閉
if (!executor.awaitTermination(timeout, unit)) {
System.out.println("強(qiáng)制終止線程池...");
// 強(qiáng)制停止所有正在執(zhí)行的任務(wù)并返回丟棄的任務(wù)列表
List<Runnable> droppedTasks = executor.shutdownNow();
System.out.println("丟棄任務(wù)數(shù): " + droppedTasks.size());
}
} catch (InterruptedException e) {
// 如果在等待過(guò)程中線程池關(guān)閉操作被中斷,立即強(qiáng)制關(guān)閉并恢復(fù)中斷狀態(tài)
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
// 自定義線程工廠類(lèi),用于創(chuàng)建線程
private static class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1); // 線程池編號(hào),用于生成線程名
private final ThreadGroup group; // 線程組
private final AtomicInteger threadNumber = new AtomicInteger(1); // 線程編號(hào)
private final String namePrefix; // 線程名稱(chēng)前綴
CustomThreadFactory() {
// 獲取當(dāng)前系統(tǒng)的安全管理器,如果沒(méi)有,則使用當(dāng)前線程的線程組
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 設(shè)置線程池的名稱(chēng)前綴
namePrefix = "pool-" +
poolNumber.getAndIncrement() + // 線程池編號(hào)遞增
"-thread-";
}
// 創(chuàng)建新線程的方法
public Thread newThread(Runnable r) {
// 創(chuàng)建新的線程,線程組、名稱(chēng)及優(yōu)先級(jí)均已設(shè)置
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0); // 默認(rèn)優(yōu)先級(jí)和daemon設(shè)置
// 如果線程是守護(hù)線程,則將其設(shè)置為非守護(hù)線程
if (t.isDaemon())
t.setDaemon(false);
// 設(shè)置線程優(yōu)先級(jí)為默認(rèn)
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t; // 返回新創(chuàng)建的線程
}
}
}線程池初始化:
EnhancedExecutorManager的構(gòu)造方法使用ThreadPoolExecutor創(chuàng)建一個(gè)線程池,線程池大小通過(guò)corePoolSize參數(shù)傳遞。線程池的最大線程數(shù)是核心線程數(shù)的兩倍。LinkedBlockingQueue用作任務(wù)隊(duì)列,大小為 1000。若任務(wù)量超過(guò)隊(duì)列容量,則使用CallerRunsPolicy策略,即由提交任務(wù)的線程執(zhí)行該任務(wù)。- 使用自定義的
CustomThreadFactory來(lái)創(chuàng)建線程。
優(yōu)雅關(guān)閉線程池:
shutdown()方法首先調(diào)用executor.shutdown()來(lái)拒絕接受新的任務(wù),然后等待線程池在指定的超時(shí)時(shí)間內(nèi)關(guān)閉。- 如果線程池在超時(shí)時(shí)間內(nèi)未能正常關(guān)閉,則調(diào)用
shutdownNow()強(qiáng)制關(guān)閉并丟棄未執(zhí)行的任務(wù),同時(shí)輸出丟棄任務(wù)的數(shù)量。 - 如果在等待關(guān)閉過(guò)程中發(fā)生
InterruptedException,會(huì)強(qiáng)制關(guān)閉線程池,并恢復(fù)中斷狀態(tài)。
自定義線程工廠:
CustomThreadFactory通過(guò)實(shí)現(xiàn)ThreadFactory接口來(lái)定義創(chuàng)建線程的行為,主要包括線程組、線程名稱(chēng)、守護(hù)線程狀態(tài)和線程優(yōu)先級(jí)的配置。- 每個(gè)線程的名稱(chēng)遵循
pool-編號(hào)-thread-編號(hào)的格式。線程池的編號(hào)是遞增的,每個(gè)線程有自己的編號(hào)。
7.關(guān)鍵注意事項(xiàng)
- 守護(hù)線程問(wèn)題:默認(rèn)創(chuàng)建的是非守護(hù)線程,需顯式關(guān)閉
- 中斷策略一致性:任務(wù)必須實(shí)現(xiàn)正確的中斷處理邏輯
- 拒絕策略配合:合理配置RejectedExecutionHandler
- 資源釋放順序:數(shù)據(jù)庫(kù)連接等資源應(yīng)先于線程池關(guān)閉
- 監(jiān)控機(jī)制:建議集成線程池監(jiān)控(如JMX)
8.高級(jí)應(yīng)用場(chǎng)景
分級(jí)關(guān)閉策略
public class TieredShutdownManager {
// 定義三個(gè)優(yōu)先級(jí)的線程池列表:高優(yōu)先級(jí)、中優(yōu)先級(jí)、低優(yōu)先級(jí)
private final List<ExecutorService> highPriority;
private final List<ExecutorService> normalPriority;
private final List<ExecutorService> lowPriority;
// 公共方法用于優(yōu)雅關(guān)閉所有線程池
public void gracefulShutdown() {
// 依次關(guān)閉高、中、低優(yōu)先級(jí)的線程池
shutdownTier(highPriority, 10, TimeUnit.SECONDS);
shutdownTier(normalPriority, 30, TimeUnit.SECONDS);
shutdownTier(lowPriority, 60, TimeUnit.SECONDS);
}
// 私有方法,用于優(yōu)雅關(guān)閉指定優(yōu)先級(jí)的線程池
private void shutdownTier(List<ExecutorService> tier, long timeout, TimeUnit unit) {
// 對(duì)指定的線程池列表執(zhí)行關(guān)閉操作
tier.forEach(ExecutorService::shutdown);
// 對(duì)每個(gè)線程池執(zhí)行等待終止的操作,指定超時(shí)時(shí)間
tier.forEach(executor -> {
try {
// 如果線程池未在超時(shí)時(shí)間內(nèi)終止,則調(diào)用 shutdownNow 強(qiáng)制關(guān)閉
if (!executor.awaitTermination(timeout, unit)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
// 如果在等待終止過(guò)程中線程被中斷,恢復(fù)中斷狀態(tài)并強(qiáng)制關(guān)閉線程池
Thread.currentThread().interrupt();
executor.shutdownNow();
}
});
}
}gracefulShutdown 方法按照優(yōu)先級(jí)順序依次關(guān)閉高、中、低優(yōu)先級(jí)的線程池。
shutdownTier 方法首先嘗試正常關(guān)閉每個(gè)線程池(調(diào)用 shutdown),然后通過(guò) awaitTermination 方法等待線程池在指定的時(shí)間內(nèi)結(jié)束,如果未成功結(jié)束,則調(diào)用 shutdownNow 強(qiáng)制關(guān)閉。
在關(guān)閉過(guò)程中,如果發(fā)生中斷,則會(huì)捕獲 InterruptedException 異常,并且中斷當(dāng)前線程,同時(shí)強(qiáng)制關(guān)閉線程池。
9.性能優(yōu)化建議
根據(jù)任務(wù)類(lèi)型選擇隊(duì)列策略:
- CPU密集型:有界隊(duì)列(ArrayBlockingQueue)
- IO密集型:無(wú)界隊(duì)列(LinkedBlockingQueue)
監(jiān)控關(guān)鍵指標(biāo):
ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
System.out.println("活躍線程數(shù): " + executor.getActiveCount());
System.out.println("完成任務(wù)數(shù): " + executor.getCompletedTaskCount());
System.out.println("隊(duì)列大小: " + executor.getQueue().size());
動(dòng)態(tài)調(diào)整參數(shù):
executor.setCorePoolSize(newSize); executor.setMaximumPoolSize(newMaxSize);
10.總結(jié)建議
根據(jù)Oracle官方文檔建議,在大多數(shù)生產(chǎn)場(chǎng)景中推薦以下關(guān)閉流程:
- 優(yōu)先調(diào)用shutdown()
- 設(shè)置合理的awaitTermination超時(shí)
- 必要時(shí)調(diào)用shutdownNow()
- 始終處理返回的未完成任務(wù)
- 記錄完整的關(guān)閉日志
正確選擇關(guān)閉策略需要綜合考量:
- 任務(wù)重要性等級(jí)
- 系統(tǒng)資源限制
- 業(yè)務(wù)連續(xù)性需求
- 數(shù)據(jù)一致性要求
到此這篇關(guān)于Java如何優(yōu)雅關(guān)閉異步中的ExecutorService的文章就介紹到這了,更多相關(guān)Java關(guān)閉ExecutorService內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java科學(xué)計(jì)數(shù)法轉(zhuǎn)換成數(shù)字的幾種方法
我們?cè)谔幚泶髷?shù)值的時(shí)候,常常會(huì)遇到使用科學(xué)計(jì)數(shù)法表示的數(shù)字,科學(xué)計(jì)數(shù)法是一種表示大數(shù)值或小數(shù)值的方式,下面這篇文章主要給大家介紹了關(guān)于java科學(xué)計(jì)數(shù)法轉(zhuǎn)換成數(shù)字的幾種方法,需要的朋友可以參考下2024-03-03
SpringBoot熱部署Springloaded實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了SpringBoot熱部署Springloaded實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
關(guān)于springboot集成swagger3時(shí)spring-plugin-core報(bào)錯(cuò)的問(wèn)題
這篇文章主要介紹了關(guān)于springboot集成swagger3時(shí)spring-plugin-core報(bào)錯(cuò)的問(wèn)題,本文給大家分享解決方法,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09
java使用Nagao算法實(shí)現(xiàn)新詞發(fā)現(xiàn)、熱門(mén)詞的挖掘
這篇文章主要介紹了java使用Nagao算法實(shí)現(xiàn)新詞發(fā)現(xiàn)、熱門(mén)詞的挖掘的思路和詳細(xì)代碼,需要的朋友可以參考下2015-07-07
解決SpringBoot中MultipartResolver和ServletFileUpload的沖突問(wèn)題
這篇文章主要介紹了解決SpringBoot中MultipartResolver和ServletFileUpload的沖突問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
IDEA卡在”正在解析Maven依賴(lài)項(xiàng)“的解決方法
在創(chuàng)建新的SpringBoot項(xiàng)目時(shí),始終卡在"正在解析Maven依賴(lài)項(xiàng)…",本文小編給大家介紹了幾種相關(guān)的解決方案,具有一定的參考價(jià)值,需要的朋友可以參考下2023-11-11
最常用的1000個(gè)Java類(lèi)(附代碼示例)
這篇文章主要介紹了最常用的1000個(gè)Java類(lèi)(附代碼示例),需要的朋友可以參考下2015-04-04

