View Javadoc
1   /*
2    * Copyright (C) 2010 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import com.google.common.collect.ObjectArrays;
20  
21  import java.util.AbstractQueue;
22  import java.util.Collection;
23  import java.util.ConcurrentModificationException;
24  import java.util.Iterator;
25  import java.util.NoSuchElementException;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.TimeUnit;
28  
29  import javax.annotation.Nullable;
30  
31  /**
32   * A bounded {@linkplain BlockingQueue blocking queue} backed by an
33   * array.  This queue orders elements FIFO (first-in-first-out).  The
34   * <em>head</em> of the queue is that element that has been on the
35   * queue the longest time.  The <em>tail</em> of the queue is that
36   * element that has been on the queue the shortest time. New elements
37   * are inserted at the tail of the queue, and the queue retrieval
38   * operations obtain elements at the head of the queue.
39   *
40   * <p>This is a classic &quot;bounded buffer&quot;, in which a
41   * fixed-sized array holds elements inserted by producers and
42   * extracted by consumers.  Once created, the capacity cannot be
43   * increased.  Attempts to <tt>put</tt> an element into a full queue
44   * will result in the operation blocking; attempts to <tt>take</tt> an
45   * element from an empty queue will similarly block.
46   *
47   * <p> This class supports an optional fairness policy for ordering
48   * waiting producer and consumer threads.  By default, this ordering
49   * is not guaranteed. However, a queue constructed with fairness set
50   * to <tt>true</tt> grants threads access in FIFO order. Fairness
51   * generally decreases throughput but reduces variability and avoids
52   * starvation.
53   *
54   * <p>This class and its iterator implement all of the
55   * <em>optional</em> methods of the {@link Collection} and {@link
56   * Iterator} interfaces.
57   *
58   * @author Doug Lea
59   * @author Justin T. Sampson
60   * @param <E> the type of elements held in this collection
61   */
62  public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E>
63          implements BlockingQueue<E> {
64  
65      // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from
66      // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
67  
68      /** The queued items  */
69      final E[] items;
70      /** items index for next take, poll or remove */
71      int takeIndex;
72      /** items index for next put, offer, or add. */
73      int putIndex;
74      /** Number of items in the queue */
75      private int count;
76  
77      /*
78       * Concurrency control uses the classic two-condition algorithm
79       * found in any textbook.
80       */
81  
82      /** Monitor guarding all access */
83      final Monitor monitor;
84  
85      /** Guard for waiting takes */
86      private final Monitor.Guard notEmpty;
87  
88      /** Guard for waiting puts */
89      private final Monitor.Guard notFull;
90  
91      // Internal helper methods
92  
93      /**
94       * Circularly increment i.
95       */
96      final int inc(int i) {
97          return (++i == items.length) ? 0 : i;
98      }
99  
100     /**
101      * Inserts element at current put position, advances, and signals.
102      * Call only when occupying monitor.
103      */
104     private void insert(E x) {
105         items[putIndex] = x;
106         putIndex = inc(putIndex);
107         ++count;
108     }
109 
110     /**
111      * Extracts element at current take position, advances, and signals.
112      * Call only when occupying monitor.
113      */
114     private E extract() {
115         final E[] items = this.items;
116         E x = items[takeIndex];
117         items[takeIndex] = null;
118         takeIndex = inc(takeIndex);
119         --count;
120         return x;
121     }
122 
123     /**
124      * Utility for remove and iterator.remove: Delete item at position i.
125      * Call only when occupying monitor.
126      */
127     void removeAt(int i) {
128         final E[] items = this.items;
129         // if removing front item, just advance
130         if (i == takeIndex) {
131             items[takeIndex] = null;
132             takeIndex = inc(takeIndex);
133         } else {
134             // slide over all others up through putIndex.
135             for (;;) {
136                 int nexti = inc(i);
137                 if (nexti != putIndex) {
138                     items[i] = items[nexti];
139                     i = nexti;
140                 } else {
141                     items[i] = null;
142                     putIndex = i;
143                     break;
144                 }
145             }
146         }
147         --count;
148     }
149 
150     /**
151      * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
152      * capacity and default access policy.
153      *
154      * @param capacity the capacity of this queue
155      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
156      */
157     public MonitorBasedArrayBlockingQueue(int capacity) {
158         this(capacity, false);
159     }
160 
161     /**
162      * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
163      * capacity and the specified access policy.
164      *
165      * @param capacity the capacity of this queue
166      * @param fair if <tt>true</tt> then queue accesses for threads blocked
167      *        on insertion or removal, are processed in FIFO order;
168      *        if <tt>false</tt> the access order is unspecified.
169      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
170      */
171     public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) {
172         if (capacity <= 0)
173             throw new IllegalArgumentException();
174         this.items = newEArray(capacity);
175         monitor = new Monitor(fair);
176         notEmpty = new Monitor.Guard(monitor) {
177             @Override public boolean isSatisfied() {
178                 return count > 0;
179             }
180         };
181         notFull = new Monitor.Guard(monitor) {
182             @Override public boolean isSatisfied() {
183                 return count < items.length;
184             }
185         };
186     }
187 
188     @SuppressWarnings("unchecked") // please don't try this home, kids
189     private static <E> E[] newEArray(int capacity) {
190         return (E[]) new Object[capacity];
191     }
192 
193     /**
194      * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed)
195      * capacity, the specified access policy and initially containing the
196      * elements of the given collection,
197      * added in traversal order of the collection's iterator.
198      *
199      * @param capacity the capacity of this queue
200      * @param fair if <tt>true</tt> then queue accesses for threads blocked
201      *        on insertion or removal, are processed in FIFO order;
202      *        if <tt>false</tt> the access order is unspecified.
203      * @param c the collection of elements to initially contain
204      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
205      *         <tt>c.size()</tt>, or less than 1.
206      * @throws NullPointerException if the specified collection or any
207      *         of its elements are null
208      */
209     public MonitorBasedArrayBlockingQueue(int capacity, boolean fair,
210                               Collection<? extends E> c) {
211         this(capacity, fair);
212         if (capacity < c.size())
213             throw new IllegalArgumentException();
214 
215         for (E e : c)
216             add(e);
217     }
218 
219     /**
220      * Inserts the specified element at the tail of this queue if it is
221      * possible to do so immediately without exceeding the queue's capacity,
222      * returning <tt>true</tt> upon success and throwing an
223      * <tt>IllegalStateException</tt> if this queue is full.
224      *
225      * @param e the element to add
226      * @return <tt>true</tt> (as specified by {@link Collection#add})
227      * @throws IllegalStateException if this queue is full
228      * @throws NullPointerException if the specified element is null
229      */
230     @Override public boolean add(E e) {
231         return super.add(e);
232     }
233 
234     /**
235      * Inserts the specified element at the tail of this queue if it is
236      * possible to do so immediately without exceeding the queue's capacity,
237      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
238      * is full.  This method is generally preferable to method {@link #add},
239      * which can fail to insert an element only by throwing an exception.
240      *
241      * @throws NullPointerException if the specified element is null
242      */
243     @Override
244     public boolean offer(E e) {
245         if (e == null) throw new NullPointerException();
246         final Monitor monitor = this.monitor;
247         if (monitor.enterIf(notFull)) {
248             try {
249                 insert(e);
250                 return true;
251             } finally {
252                 monitor.leave();
253             }
254         } else {
255           return false;
256         }
257     }
258 
259     /**
260      * Inserts the specified element at the tail of this queue, waiting
261      * for space to become available if the queue is full.
262      *
263      * @throws InterruptedException {@inheritDoc}
264      * @throws NullPointerException {@inheritDoc}
265      */
266     @Override
267     public void put(E e) throws InterruptedException {
268         if (e == null) throw new NullPointerException();
269         final Monitor monitor = this.monitor;
270         monitor.enterWhen(notFull);
271         try {
272             insert(e);
273         } finally {
274             monitor.leave();
275         }
276     }
277 
278     /**
279      * Inserts the specified element at the tail of this queue, waiting
280      * up to the specified wait time for space to become available if
281      * the queue is full.
282      *
283      * @throws InterruptedException {@inheritDoc}
284      * @throws NullPointerException {@inheritDoc}
285      */
286     @Override
287     public boolean offer(E e, long timeout, TimeUnit unit)
288         throws InterruptedException {
289 
290         if (e == null) throw new NullPointerException();
291         final Monitor monitor = this.monitor;
292         if (monitor.enterWhen(notFull, timeout, unit)) {
293             try {
294                 insert(e);
295                 return true;
296             } finally {
297                 monitor.leave();
298             }
299         } else {
300           return false;
301         }
302     }
303 
304     @Override
305     public E poll() {
306         final Monitor monitor = this.monitor;
307         if (monitor.enterIf(notEmpty)) {
308             try {
309                 return extract();
310             } finally {
311                 monitor.leave();
312             }
313         } else {
314           return null;
315         }
316     }
317 
318     @Override
319     public E take() throws InterruptedException {
320         final Monitor monitor = this.monitor;
321         monitor.enterWhen(notEmpty);
322         try {
323             return extract();
324         } finally {
325             monitor.leave();
326         }
327     }
328 
329     @Override
330     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
331         final Monitor monitor = this.monitor;
332         if (monitor.enterWhen(notEmpty, timeout, unit)) {
333             try {
334                 return extract();
335             } finally {
336                 monitor.leave();
337             }
338         } else {
339           return null;
340         }
341     }
342 
343     @Override
344     public E peek() {
345         final Monitor monitor = this.monitor;
346         if (monitor.enterIf(notEmpty)) {
347             try {
348                 return items[takeIndex];
349             } finally {
350                 monitor.leave();
351             }
352         } else {
353             return null;
354         }
355     }
356 
357     // this doc comment is overridden to remove the reference to collections
358     // greater in size than Integer.MAX_VALUE
359     /**
360      * Returns the number of elements in this queue.
361      *
362      * @return the number of elements in this queue
363      */
364     @Override public int size() {
365         final Monitor monitor = this.monitor;
366         monitor.enter();
367         try {
368             return count;
369         } finally {
370             monitor.leave();
371         }
372     }
373 
374     // this doc comment is a modified copy of the inherited doc comment,
375     // without the reference to unlimited queues.
376     /**
377      * Returns the number of additional elements that this queue can ideally
378      * (in the absence of memory or resource constraints) accept without
379      * blocking. This is always equal to the initial capacity of this queue
380      * less the current <tt>size</tt> of this queue.
381      *
382      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
383      * an element will succeed by inspecting <tt>remainingCapacity</tt>
384      * because it may be the case that another thread is about to
385      * insert or remove an element.
386      */
387     @Override
388     public int remainingCapacity() {
389         final Monitor monitor = this.monitor;
390         monitor.enter();
391         try {
392             return items.length - count;
393         } finally {
394             monitor.leave();
395         }
396     }
397 
398     /**
399      * Removes a single instance of the specified element from this queue,
400      * if it is present.  More formally, removes an element <tt>e</tt> such
401      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
402      * elements.
403      * Returns <tt>true</tt> if this queue contained the specified element
404      * (or equivalently, if this queue changed as a result of the call).
405      *
406      * @param o element to be removed from this queue, if present
407      * @return <tt>true</tt> if this queue changed as a result of the call
408      */
409     @Override public boolean remove(@Nullable Object o) {
410         if (o == null) return false;
411         final E[] items = this.items;
412         final Monitor monitor = this.monitor;
413         monitor.enter();
414         try {
415             int i = takeIndex;
416             int k = 0;
417             for (;;) {
418                 if (k++ >= count)
419                     return false;
420                 if (o.equals(items[i])) {
421                     removeAt(i);
422                     return true;
423                 }
424                 i = inc(i);
425             }
426         } finally {
427             monitor.leave();
428         }
429     }
430 
431     /**
432      * Returns <tt>true</tt> if this queue contains the specified element.
433      * More formally, returns <tt>true</tt> if and only if this queue contains
434      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
435      *
436      * @param o object to be checked for containment in this queue
437      * @return <tt>true</tt> if this queue contains the specified element
438      */
439     @Override public boolean contains(@Nullable Object o) {
440         if (o == null) return false;
441         final E[] items = this.items;
442         final Monitor monitor = this.monitor;
443         monitor.enter();
444         try {
445             int i = takeIndex;
446             int k = 0;
447             while (k++ < count) {
448                 if (o.equals(items[i]))
449                     return true;
450                 i = inc(i);
451             }
452             return false;
453         } finally {
454             monitor.leave();
455         }
456     }
457 
458     /**
459      * Returns an array containing all of the elements in this queue, in
460      * proper sequence.
461      *
462      * <p>The returned array will be "safe" in that no references to it are
463      * maintained by this queue.  (In other words, this method must allocate
464      * a new array).  The caller is thus free to modify the returned array.
465      *
466      * <p>This method acts as bridge between array-based and collection-based
467      * APIs.
468      *
469      * @return an array containing all of the elements in this queue
470      */
471     @Override public Object[] toArray() {
472         final E[] items = this.items;
473         final Monitor monitor = this.monitor;
474         monitor.enter();
475         try {
476             Object[] a = new Object[count];
477             int k = 0;
478             int i = takeIndex;
479             while (k < count) {
480                 a[k++] = items[i];
481                 i = inc(i);
482             }
483             return a;
484         } finally {
485             monitor.leave();
486         }
487     }
488 
489     /**
490      * Returns an array containing all of the elements in this queue, in
491      * proper sequence; the runtime type of the returned array is that of
492      * the specified array.  If the queue fits in the specified array, it
493      * is returned therein.  Otherwise, a new array is allocated with the
494      * runtime type of the specified array and the size of this queue.
495      *
496      * <p>If this queue fits in the specified array with room to spare
497      * (i.e., the array has more elements than this queue), the element in
498      * the array immediately following the end of the queue is set to
499      * <tt>null</tt>.
500      *
501      * <p>Like the {@link #toArray()} method, this method acts as bridge between
502      * array-based and collection-based APIs.  Further, this method allows
503      * precise control over the runtime type of the output array, and may,
504      * under certain circumstances, be used to save allocation costs.
505      *
506      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
507      * The following code can be used to dump the queue into a newly
508      * allocated array of <tt>String</tt>:
509      *
510      * <pre>
511      *     String[] y = x.toArray(new String[0]);</pre>
512      *
513      * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
514      * <tt>toArray()</tt>.
515      *
516      * @param a the array into which the elements of the queue are to
517      *          be stored, if it is big enough; otherwise, a new array of the
518      *          same runtime type is allocated for this purpose
519      * @return an array containing all of the elements in this queue
520      * @throws ArrayStoreException if the runtime type of the specified array
521      *         is not a supertype of the runtime type of every element in
522      *         this queue
523      * @throws NullPointerException if the specified array is null
524      */
525     @Override public <T> T[] toArray(T[] a) {
526         final E[] items = this.items;
527         final Monitor monitor = this.monitor;
528         monitor.enter();
529         try {
530             if (a.length < count)
531                 a = ObjectArrays.newArray(a, count);
532 
533             int k = 0;
534             int i = takeIndex;
535             while (k < count) {
536                 // This cast is not itself safe, but the following statement
537                 // will fail if the runtime type of items[i] is not assignable
538                 // to the runtime type of a[k++], which is all that the method
539                 // contract requires (see @throws ArrayStoreException above).
540                 @SuppressWarnings("unchecked")
541                 T t = (T) items[i];
542                 a[k++] = t;
543                 i = inc(i);
544             }
545             if (a.length > count)
546                 a[count] = null;
547             return a;
548         } finally {
549             monitor.leave();
550         }
551     }
552 
553     @Override public String toString() {
554         final Monitor monitor = this.monitor;
555         monitor.enter();
556         try {
557             return super.toString();
558         } finally {
559             monitor.leave();
560         }
561     }
562 
563     /**
564      * Atomically removes all of the elements from this queue.
565      * The queue will be empty after this call returns.
566      */
567     @Override public void clear() {
568         final E[] items = this.items;
569         final Monitor monitor = this.monitor;
570         monitor.enter();
571         try {
572             int i = takeIndex;
573             int k = count;
574             while (k-- > 0) {
575                 items[i] = null;
576                 i = inc(i);
577             }
578             count = 0;
579             putIndex = 0;
580             takeIndex = 0;
581         } finally {
582             monitor.leave();
583         }
584     }
585 
586     /**
587      * @throws UnsupportedOperationException {@inheritDoc}
588      * @throws ClassCastException            {@inheritDoc}
589      * @throws NullPointerException          {@inheritDoc}
590      * @throws IllegalArgumentException      {@inheritDoc}
591      */
592     @Override
593     public int drainTo(Collection<? super E> c) {
594         if (c == null)
595             throw new NullPointerException();
596         if (c == this)
597             throw new IllegalArgumentException();
598         final E[] items = this.items;
599         final Monitor monitor = this.monitor;
600         monitor.enter();
601         try {
602             int i = takeIndex;
603             int n = 0;
604             int max = count;
605             while (n < max) {
606                 c.add(items[i]);
607                 items[i] = null;
608                 i = inc(i);
609                 ++n;
610             }
611             if (n > 0) {
612                 count = 0;
613                 putIndex = 0;
614                 takeIndex = 0;
615             }
616             return n;
617         } finally {
618             monitor.leave();
619         }
620     }
621 
622     /**
623      * @throws UnsupportedOperationException {@inheritDoc}
624      * @throws ClassCastException            {@inheritDoc}
625      * @throws NullPointerException          {@inheritDoc}
626      * @throws IllegalArgumentException      {@inheritDoc}
627      */
628     @Override
629     public int drainTo(Collection<? super E> c, int maxElements) {
630         if (c == null)
631             throw new NullPointerException();
632         if (c == this)
633             throw new IllegalArgumentException();
634         if (maxElements <= 0)
635             return 0;
636         final E[] items = this.items;
637         final Monitor monitor = this.monitor;
638         monitor.enter();
639         try {
640             int i = takeIndex;
641             int n = 0;
642             int max = (maxElements < count) ? maxElements : count;
643             while (n < max) {
644                 c.add(items[i]);
645                 items[i] = null;
646                 i = inc(i);
647                 ++n;
648             }
649             if (n > 0) {
650                 count -= n;
651                 takeIndex = i;
652             }
653             return n;
654         } finally {
655             monitor.leave();
656         }
657     }
658 
659     /**
660      * Returns an iterator over the elements in this queue in proper sequence.
661      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
662      * will never throw {@link ConcurrentModificationException},
663      * and guarantees to traverse elements as they existed upon
664      * construction of the iterator, and may (but is not guaranteed to)
665      * reflect any modifications subsequent to construction.
666      *
667      * @return an iterator over the elements in this queue in proper sequence
668      */
669     @Override public Iterator<E> iterator() {
670         final Monitor monitor = this.monitor;
671         monitor.enter();
672         try {
673             return new Itr();
674         } finally {
675             monitor.leave();
676         }
677     }
678 
679     /**
680      * Iterator for MonitorBasedArrayBlockingQueue
681      */
682     private class Itr implements Iterator<E> {
683         /**
684          * Index of element to be returned by next,
685          * or a negative number if no such.
686          */
687         private int nextIndex;
688 
689         /**
690          * nextItem holds on to item fields because once we claim
691          * that an element exists in hasNext(), we must return it in
692          * the following next() call even if it was in the process of
693          * being removed when hasNext() was called.
694          */
695         private E nextItem;
696 
697         /**
698          * Index of element returned by most recent call to next.
699          * Reset to -1 if this element is deleted by a call to remove.
700          */
701         private int lastRet;
702 
703         Itr() {
704             lastRet = -1;
705             if (count == 0)
706                 nextIndex = -1;
707             else {
708                 nextIndex = takeIndex;
709                 nextItem = items[takeIndex];
710             }
711         }
712 
713         @Override
714         public boolean hasNext() {
715             /*
716              * No sync. We can return true by mistake here
717              * only if this iterator passed across threads,
718              * which we don't support anyway.
719              */
720             return nextIndex >= 0;
721         }
722 
723         /**
724          * Checks whether nextIndex is valid; if so setting nextItem.
725          * Stops iterator when either hits putIndex or sees null item.
726          */
727         private void checkNext() {
728             if (nextIndex == putIndex) {
729                 nextIndex = -1;
730                 nextItem = null;
731             } else {
732                 nextItem = items[nextIndex];
733                 if (nextItem == null)
734                     nextIndex = -1;
735             }
736         }
737 
738         @Override
739         public E next() {
740             final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
741             monitor.enter();
742             try {
743                 if (nextIndex < 0)
744                     throw new NoSuchElementException();
745                 lastRet = nextIndex;
746                 E x = nextItem;
747                 nextIndex = inc(nextIndex);
748                 checkNext();
749                 return x;
750             } finally {
751                 monitor.leave();
752             }
753         }
754 
755         @Override
756         public void remove() {
757             final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor;
758             monitor.enter();
759             try {
760                 int i = lastRet;
761                 if (i == -1)
762                     throw new IllegalStateException();
763                 lastRet = -1;
764 
765                 int ti = takeIndex;
766                 removeAt(i);
767                 // back up cursor (reset to front if was first element)
768                 nextIndex = (i == ti) ? takeIndex : i;
769                 checkNext();
770             } finally {
771                 monitor.leave();
772             }
773         }
774     }
775 }