日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Java 并發編程ArrayBlockingQueue的實現

瀏覽:32日期:2022-08-16 16:54:07
一、簡介

ArrayBlockingQueue 顧名思義:基于數組的阻塞隊列。數組是要指定長度的,所以使用 ArrayBlockingQueue 時必須指定長度,也就是它是一個有界隊列。它實現了 BlockingQueue 接口,有著隊列、集合以及阻塞隊列的所有方法。

Java 并發編程ArrayBlockingQueue的實現

ArrayBlockingQueue 是線程安全的,內部使用 ReentrantLock 來保證。ArrayBlockingQueue 支持對生產者線程和消費者線程進行公平的調度。當然默認情況下是不保證公平性的,因為公平性通常會降低吞吐量,但是可以減少可變性和避免線程饑餓問題。

二、數據結構

通常,隊列的實現方式有數組和鏈表兩種方式。對于數組這種實現方式來說,我們可以通過維護一個隊尾指針,使得在入隊的時候可以在 O(1)O(1) 的時間內完成;但是對于出隊操作,在刪除隊頭元素之后,必須將數組中的所有元素都往前移動一個位置,這個操作的復雜度達到了 O(n)O(n),效果并不是很好。如下圖所示:

Java 并發編程ArrayBlockingQueue的實現

為了解決這個問題,我們可以使用另外一種邏輯結構來處理數組中各個位置之間的關系。假設現在我們有一個數組 A[1…n],我們可以把它想象成一個環型結構,即 A[n] 之后是 A[1],相信了解過一致性 Hash 算法的童鞋應該很容易能夠理解。

如下圖所示:我們可以使用兩個指針,分別維護隊頭和隊尾兩個位置,使入隊和出隊操作都可以在 O(1O(1 )的時間內完成。當然,這個環形結構只是邏輯上的結構,實際的物理結構還是一個普通的數組。

Java 并發編程ArrayBlockingQueue的實現

講完 ArrayBlockingQueue 的數據結構,接下來我們從源碼層面看看它是如何實現阻塞的。

三、源碼分析3.1 屬性

// 隊列的底層結構final Object[] items;// 隊頭指針int takeIndex;// 隊尾指針int putIndex;// 隊列中的元素個數int count;final ReentrantLock lock;// 并發時的兩種狀態private final Condition notEmpty;private final Condition notFull;

items 是一個數組,用來存放入隊的數據;count 表示隊列中元素的個數;takeIndex 和 putIndex 分別代表隊頭和隊尾指針。

3.2 構造方法

public ArrayBlockingQueue(int capacity) { this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) {checkNotNull(e);items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); }}

第一個構造函數只需要指定隊列大小,默認為非公平鎖;第二個構造函數可以手動指定公平性和隊列大小;第三個構造函數里面使用了 ReentrantLock 來加鎖,然后把傳入的集合元素按順序一個個放入 items 中。這里加鎖目的不是使用它的互斥性,而是讓 items 中的元素對其他線程可見(參考 AQS 里的 state 的 volatile 可見性)。

3.3 方法

3.3.1 入隊

ArrayBlockingQueue 提供了多種入隊操作的實現來滿足不同情況下的需求,入隊操作有如下幾種:

boolean add(E e) void put(E e) boolean offer(E e) boolean offer(E e, long timeout, TimeUnit unit)

(1)add(E e)

public boolean add(E e) { return super.add(e);}//super.add(e)public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException('Queue full');}

可以看到 add 方法調用的是父類,也就是 AbstractQueue 的 add 方法,它實際上調用的就是 offer 方法。

(2)offer(E e)

我們接著上面的 add 方法來看 offer 方法:

public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); }}

offer 方法在隊列滿了的時候返回 false,否則調用 enqueue 方法插入元素,并返回 true。

private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; // 圓環的index操作 if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();}

enqueue 方法首先把元素放在 items 的 putIndex 位置,接著判斷在 putIndex+1 等于隊列的長度時把 putIndex 設置為0,也就是上面提到的圓環的 index 操作。最后喚醒等待獲取元素的線程。

(3)offer(E e, long timeout, TimeUnit unit)

該方法在 offer(E e) 的基礎上增加了超時的概念。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); // 把超時時間轉換成納秒 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 獲取一個可中斷的互斥鎖 lock.lockInterruptibly(); try { // while循環的目的是防止在中斷后沒有到達傳入的timeout時間,繼續重試 while (count == items.length) { if (nanos <= 0)return false; // 等待nanos納秒,返回剩余的等待時間(可被中斷) nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); }}

利用了 Condition 的 awaitNanos 方法,等待指定時間,因為該方法可中斷,所以這里利用 while 循環來處理中斷后還有剩余時間的問題,等待時間到了以后調用 enqueue 方法放入隊列。

(4)put(E e)

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }}

put 方法在 count 等于 items 長度時,一直等待,直到被其他線程喚醒。喚醒后調用 enqueue 方法放入隊列。

3.3.2 出隊

入隊列的方法說完后,我們來說說出隊列的方法。ArrayBlockingQueue 提供了多種出隊操作的實現來滿足不同情況下的需求,如下:

E poll() E poll(long timeout, TimeUnit unit) E take() drainTo(Collection<? super E> c, int maxElements)

(1)poll()

public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); }}

poll 方法是非阻塞方法,如果隊列沒有元素返回 null,否則調用 dequeue 把隊首的元素出隊列。

private E dequeue() { final Object[] items = this.items; @SuppressWarnings('unchecked') E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x;}

dequeue 會根據 takeIndex 獲取到該位置的元素,并把該位置置為 null,接著利用圓環原理,在 takeIndex 到達列表長度時設置為0,最后喚醒等待元素放入隊列的線程。

(2)poll(long timeout, TimeUnit unit)

該方法是 poll() 的可配置超時等待方法,和上面的 offer 一樣,使用 while 循環配合 Condition 的 awaitNanos 來進行等待,等待時間到后執行 dequeue 獲取元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0)return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); }}

(3)take()

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }}

取走隊列里排在首位的對象,不同于 poll() 方法,若BlockingQueue為空,就阻塞等待直到有新的數據被加入。(4)drainTo()

public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE);}public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); int take = takeIndex; int i = 0; try { while (i < n) {@SuppressWarnings('unchecked')E x = (E) items[take];c.add(x);items[take] = null;if (++take == items.length) take = 0;i++; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) {count -= i;takeIndex = take;if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped();}for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); }}

drainTo 相比于其他獲取方法,能夠一次性從隊列中獲取所有可用的數據對象(還可以指定獲取數據的個數)。通過該方法,可以提升獲取數據效率,不需要多次分批加鎖或釋放鎖。

3.3.3 獲取元素

public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); }}final E itemAt(int i) { return (E) items[i];}

這里獲取元素時上鎖是為了避免臟數據的產生。

3.3.4 刪除元素

我們可以想象一下,隊列中刪除某一個元素時,是不是要遍歷整個數據找到該元素,并把該元素后的所有元素往前移一位,也就是說,該方法的時間復雜度為 O(n)O(n)。

public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; // 從takeIndex一直遍歷到putIndex,直到找到和元素o相同的元素,調用removeAt進行刪除 do {if (o.equals(items[i])) { removeAt(i); return true;}if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); }}

remove 方法比較簡單,它從 takeIndex 一直遍歷到 putIndex,直到找到和元素 o 相同的元素,調用 removeAt 進行刪除。我們重點來看一下 removeAt 方法。

void removeAt(final int removeIndex) { final Object[] items = this.items; if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an 'interior' remove // slide over all others up through putIndex. final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length)next = 0; if (next != putIndex) {items[i] = items[next];i = next; } else {items[i] = null;this.putIndex = i;break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal();}

removeAt 的處理方式和我想的稍微有一點出入,它內部分為兩種情況來考慮:

removeIndex == takeIndex removeIndex != takeIndex

也就是我考慮的時候沒有考慮邊界問題。當 removeIndex == takeIndex 時就不需要后面的元素整體往前移了,而只需要把 takeIndex的指向下一個元素即可(類比圓環);當 removeIndex != takeIndex 時,通過 putIndex 將 removeIndex 后的元素往前移一位。

四、總結

ArrayBlockingQueue 是一個阻塞隊列,內部由 ReentrantLock 來實現線程安全,由 Condition 的 await 和 signal 來實現等待喚醒的功能。它的數據結構是數組,準確的說是一個循環數組(可以類比一個圓環),所有的下標在到達最大長度時自動從 0 繼續開始。

到此這篇關于Java 并發編程ArrayBlockingQueue的實現的文章就介紹到這了,更多相關Java 并發編程ArrayBlockingQueue內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Java
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
美国三级日本三级久久99| 日韩高清电影一区| 精品高清久久| 里番精品3d一二三区| 日本免费一区二区视频| 日韩不卡在线观看日韩不卡视频| 日韩欧美另类中文字幕| 日本一区二区三区视频在线看 | 国产午夜久久| 亚洲欧美日本日韩| 中文字幕中文字幕精品| 日产欧产美韩系列久久99| 日韩国产在线不卡视频| 国产剧情在线观看一区| 卡一卡二国产精品| 日韩精品久久理论片| 在线观看免费一区二区| 日本va欧美va欧美va精品| 欧美xxxx中国| 99成人在线| 欧美激情网址| 国产精品试看| 精品视频91| 亚洲综合精品四区| 精品国产亚洲日本| 老鸭窝亚洲一区二区三区| 麻豆91精品视频| 亚洲欧美日韩国产综合精品二区| 国产乱码精品一区二区三区四区| 午夜免费一区| 精品久久久网| 日韩在线观看一区二区| 精品国产乱码| 综合激情在线| 91精品精品| 国产精品亚洲人成在99www| 亚洲二区视频| 欧美极品中文字幕| 亚洲欧美视频一区二区三区| 日本欧美国产| 91嫩草精品| 精品在线91| 久久精品免视看国产成人| 香蕉久久夜色精品国产| 久久毛片亚洲| 国产videos久久| 精品视频97| 免费看的黄色欧美网站| 欧美激情五月| 亚洲激情精品| 麻豆国产欧美日韩综合精品二区| 三级小说欧洲区亚洲区| 日韩三级精品| av免费不卡国产观看| 国产精品女主播一区二区三区| 国产精品久久久久久妇女| 999久久久国产精品| 日本亚州欧洲精品不卡| 蜜臀国产一区| 日韩中文av| 久久麻豆精品| 欧美影院视频| 久久九九电影| 欧美日韩亚洲三区| 亚洲性视频h| 国产精品一国产精品| japanese国产精品| 欧美aⅴ一区二区三区视频| 免费日韩精品中文字幕视频在线| 精品国产日韩欧美精品国产欧美日韩一区二区三区 | 欧美一区自拍| 欧美日韩精品免费观看视频完整| 国产亚洲精品美女久久| 国产亚洲高清视频| 成人午夜毛片| 欧美一级久久| 免费日韩一区二区| 日本欧美不卡| 麻豆国产精品777777在线| 一区二区不卡| 午夜精品一区二区三区国产| 国产成人黄色| 国产亚洲精品美女久久| 美日韩精品视频| 神马午夜久久| 精品久久国产一区| 日本不卡一区二区三区| av亚洲免费| 久久黄色影院| 精品72久久久久中文字幕| 欧美黄色一区二区| 日韩精品1区2区3区| 中文日韩欧美| 久久精品动漫| 国产欧洲在线| 国产麻豆一区二区三区精品视频| 在线精品亚洲| 影院欧美亚洲| 99久久久久久中文字幕一区| 精品亚洲免a| 国产精品成人自拍| 欧美精品观看| 日韩av一区二| 日韩精品久久久久久| 在线综合欧美| 一区福利视频| av亚洲免费| 黄色不卡一区| 蜜臀av免费一区二区三区| 精品捆绑调教一区二区三区| 麻豆视频在线看| av日韩中文| 日韩成人亚洲| 国产精品日本一区二区三区在线| 中文字幕av一区二区三区四区| 亚洲精品在线观看91| 99视频精品全部免费在线视频| 精品国产鲁一鲁****| 国产成人精品亚洲线观看| 欧美国产极品| 久久免费视频66| 精品国产亚洲一区二区三区大结局| 欧美精品不卡| 精品一区二区三区中文字幕| 麻豆国产精品| 久久久久久色 | 欧美女激情福利| 99久久99久久精品国产片果冰| av高清不卡| 久久伊人久久| 麻豆91精品视频| 精品视频免费| 在线天堂中文资源最新版| 成人精品国产亚洲| 97欧美在线视频| 国内不卡的一区二区三区中文字幕| 欧美国产三级| 精品99在线| 黄毛片在线观看| 日韩精品dvd| 欧美日韩在线播放视频| 久久久久一区| 亚洲精品99| 综合激情在线| 日本午夜精品一区二区三区电影| 亚洲精品成a人ⅴ香蕉片| 7777精品| 日韩激情一区二区| 国产乱子精品一区二区在线观看| 国产美女视频一区二区| 国产中文字幕一区二区三区| 日韩免费小视频| 亚洲专区欧美专区| 日韩av午夜在线观看| 国产精品jk白丝蜜臀av小说| 国产一区二区三区久久 | 伊人久久av| 精品欧美久久| 日韩一区二区三区免费视频| 免费一级欧美在线观看视频| 人人草在线视频| 免费精品国产的网站免费观看| 久久99伊人| 国产精品久久久亚洲一区| 国产成人精品福利| 激情综合在线| 日韩av中文在线观看| 国产成年精品| 午夜一级久久| 精品久久免费| 亚洲激情精品| 国产伦精品一区二区三区在线播放 | 国产精品尤物| 亚洲综合在线电影| 一区二区三区网站| 国产精品超碰| 日本不卡免费高清视频在线| 欧美日韩国产一区二区三区不卡 | 亚洲午夜视频| 欧美一级网站| 99久久精品国产亚洲精品| 日韩在线网址| 亚洲精品88| 日韩国产成人精品| 久久久久久久久久久妇女| 偷拍亚洲精品| 成人自拍av| 欧美偷窥清纯综合图区| 天堂网av成人| **爰片久久毛片| 香蕉国产精品| 久久亚洲黄色| 亚洲一区欧美| 超碰在线99| 中文不卡在线| 黄色在线网站噜噜噜| 偷拍亚洲精品| 日韩精品网站| 欧美啪啪一区| 影音先锋久久|