Java中的DelayQueue源碼解析
介紹
一個(gè)實(shí)現(xiàn)PriorityBlockingQueue實(shí)現(xiàn)延遲獲取的無(wú)界隊(duì)列,在創(chuàng)建元素時(shí),可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有延時(shí)期滿后才能從隊(duì)列中獲取元素。
DelayQueue可以運(yùn)用在以下應(yīng)用場(chǎng)景:
1.緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí),表示緩存有效期到了。
2.定時(shí)任務(wù)調(diào)度。使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行,從比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
數(shù)據(jù)結(jié)構(gòu)
public interface Delayed extends Comparable<Delayed> {
/**
* 返回與此對(duì)象相關(guān)的剩余延遲時(shí)間,以給定的時(shí)間單位表示
*/
long getDelay(TimeUnit unit);
}getDelay方法一般用內(nèi)部存儲(chǔ)的事件,減去當(dāng)前事件,即為剩余延遲事件
屬性
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
*用于優(yōu)化內(nèi)部阻塞通知的線程
*/
private Thread leader = null;
private final Condition available = lock.newCondition();以支持優(yōu)先級(jí)的PriorityQueue無(wú)界隊(duì)列作為一個(gè)容器,因?yàn)樵囟急仨殞?shí)現(xiàn)Delayed接口,可以根據(jù)元素的過(guò)期時(shí)間來(lái)對(duì)元素進(jìn)行排列,因此,先過(guò)期的元素會(huì)在隊(duì)首,每次從隊(duì)列里取出來(lái)都是最先要過(guò)期的元素。
leader是一個(gè)Thread元素,它在offer和take中都有使用,它代表當(dāng)前獲取到鎖的消費(fèi)者線程,
DelayQueue實(shí)現(xiàn)Leader-Folloer pattern
1、當(dāng)存在多個(gè)take線程時(shí),同時(shí)只生效一個(gè),即,leader線程
2、當(dāng)leader存在時(shí),其它的take線程均為follower,其等待是通過(guò)condition實(shí)現(xiàn)的
3、當(dāng)leader不存在時(shí),當(dāng)前線程即成為leader,在delay之后,將leader角色釋放還原
4、最后如果隊(duì)列還有內(nèi)容,且leader空缺,則調(diào)用一次condition的signal,喚醒掛起的take線程,其中之一將成為新的leader
5、最后在finally中釋放鎖
方法實(shí)現(xiàn)
offer,poll,peek
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
//如果插入元素是第一個(gè)元素
if (q.peek() == e) {
//leader設(shè)置為null
leader = null;
//喚醒
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
//如果未到期,則返回null,否則刪除
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
//到期,則poll
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)//nanos<delay,表示超時(shí)剩余時(shí)間小于到期時(shí)間,
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
//設(shè)置當(dāng)前線程為leader
leader = thisThread;
try {
//等待條件
long timeLeft = available.awaitNanos(delay);
//剩余超時(shí)時(shí)間
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}put,take
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獲取可中斷鎖。
lock.lockInterruptibly();
try {
for (;;) {
// 從優(yōu)先級(jí)隊(duì)列中獲取隊(duì)列頭元素
E first = q.peek();
if (first == null)
// 無(wú)元素,當(dāng)前線程加入等待隊(duì)列,并阻塞
available.await();
else {
// 通過(guò)getDelay 方法獲取延遲時(shí)間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延遲時(shí)間到期,獲取并刪除頭部元素。
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 線程節(jié)點(diǎn)進(jìn)入等待隊(duì)列 x 納秒。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader == null且還存在元素的話,喚醒一個(gè)消費(fèi)線程。
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
public void put(E e) {
offer(e);
}take()方法邏輯:
1.獲取鎖
2.取出優(yōu)先級(jí)隊(duì)列q的首元素
3.如果元素q的隊(duì)首/隊(duì)列為空,阻塞
4.如果元素q的隊(duì)首(first)不為空,獲得這個(gè)元素的delay時(shí)間值,如果first的延遲delay時(shí)間值為0的話,說(shuō)明該元素已經(jīng)到了可以使用的時(shí)間,調(diào)用poll方法彈出該元素,跳出方法
5.如果first的延遲delay時(shí)間值不為0的話,釋放元素first的引用,避免內(nèi)存泄露
6.循環(huán)以上操作,直到return
leader作用
如果leader不為null,說(shuō)明已經(jīng)有消費(fèi)者線程拿到鎖,直接阻塞當(dāng)前線程,如果leader為null,把當(dāng)前線程賦值給leader,并等待剩余的到期時(shí)間,最后釋放leader,這里我們想象著我們有個(gè)多個(gè)消費(fèi)者線程用take方法去取,如果沒(méi)有l(wèi)eader!=null的判斷,這些線程都會(huì)無(wú)限循環(huán),直到返回第一個(gè)元素,很顯然很浪費(fèi)資源。所以leader的作用是設(shè)置一個(gè)標(biāo)記,來(lái)避免消費(fèi)者的無(wú)腦競(jìng)爭(zhēng)。
到此這篇關(guān)于Java中的DelayQueue源碼解析的文章就介紹到這了,更多相關(guān)DelayQueue源碼解析內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring為何需要三級(jí)緩存解決循環(huán)依賴詳解
這篇文章主要給大家介紹了關(guān)于Spring為何需要三級(jí)緩存解決循環(huán)依賴,而不是二級(jí)緩存的相關(guān)資料,這個(gè)也是一個(gè)Spring的高頻面試題,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-02-02
Spring整合quartz做定時(shí)任務(wù)的示例代碼
這篇文章主要介紹了在spring項(xiàng)目使用quartz做定時(shí)任務(wù),首先我這里的項(xiàng)目已經(jīng)是一個(gè)可以跑起來(lái)的完整項(xiàng)目,web.xml里面的配置我就不貼出來(lái)了,具體實(shí)例代碼跟隨小編一起看看吧2022-01-01
Maven中兩個(gè)命令clean 和 install的使用
Maven是一個(gè)項(xiàng)目管理和自動(dòng)構(gòu)建工具,clean命令用于刪除項(xiàng)目中由先前構(gòu)建生成的target目錄,install命令用于將打包好的jar包安裝到本地倉(cāng)庫(kù)中,供其他項(xiàng)目依賴使用,下面就來(lái)詳細(xì)的介紹一下這兩個(gè)命令2024-09-09
JAVA CountDownLatch(倒計(jì)時(shí)計(jì)數(shù)器)用法實(shí)例
這篇文章主要介紹了JAVA CountDownLatch(倒計(jì)時(shí)計(jì)數(shù)器)用法實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10
詳解Java語(yǔ)言中一個(gè)字符占幾個(gè)字節(jié)?
這篇文章主要介紹了Java語(yǔ)言中一個(gè)字符占幾個(gè)字節(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04

