View Javadoc
1   /*
2    * Copyright (C) 2010 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.base.Preconditions.checkNotNull;
20  
21  import java.util.AbstractQueue;
22  import java.util.Collection;
23  import java.util.Comparator;
24  import java.util.ConcurrentModificationException;
25  import java.util.Iterator;
26  import java.util.NoSuchElementException;
27  import java.util.PriorityQueue;
28  import java.util.Queue;
29  import java.util.SortedSet;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.TimeUnit;
32  
33  import javax.annotation.Nullable;
34  
35  /**
36   * An unbounded {@linkplain BlockingQueue blocking queue} that uses
37   * the same ordering rules as class {@link PriorityQueue} and supplies
38   * blocking retrieval operations.  While this queue is logically
39   * unbounded, attempted additions may fail due to resource exhaustion
40   * (causing <tt>OutOfMemoryError</tt>). This class does not permit
41   * <tt>null</tt> elements.  A priority queue relying on {@linkplain
42   * Comparable natural ordering} also does not permit insertion of
43   * non-comparable objects (doing so results in
44   * <tt>ClassCastException</tt>).
45   *
46   * <p>This class and its iterator implement all of the
47   * <em>optional</em> methods of the {@link Collection} and {@link
48   * Iterator} interfaces.  The Iterator provided in method {@link
49   * #iterator()} is <em>not</em> guaranteed to traverse the elements of
50   * the MonitorBasedPriorityBlockingQueue in any particular order. If you need
51   * ordered traversal, consider using
52   * <tt>Arrays.sort(pq.toArray())</tt>.  Also, method <tt>drainTo</tt>
53   * can be used to <em>remove</em> some or all elements in priority
54   * order and place them in another collection.
55   *
56   * <p>Operations on this class make no guarantees about the ordering
57   * of elements with equal priority. If you need to enforce an
58   * ordering, you can define custom classes or comparators that use a
59   * secondary key to break ties in primary priority values.  For
60   * example, here is a class that applies first-in-first-out
61   * tie-breaking to comparable elements. To use it, you would insert a
62   * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
63   *
64   * <pre>
65   * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
66   *     implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
67   *   final static AtomicLong seq = new AtomicLong();
68   *   final long seqNum;
69   *   final E entry;
70   *   public FIFOEntry(E entry) {
71   *     seqNum = seq.getAndIncrement();
72   *     this.entry = entry;
73   *   }
74   *   public E getEntry() { return entry; }
75   *   public int compareTo(FIFOEntry&lt;E&gt; other) {
76   *     int res = entry.compareTo(other.entry);
77   *     if (res == 0 &amp;&amp; other.entry != this.entry)
78   *       res = (seqNum &lt; other.seqNum ? -1 : 1);
79   *     return res;
80   *   }
81   * }</pre>
82   *
83   * @author Doug Lea
84   * @author Justin T. Sampson
85   * @param <E> the type of elements held in this collection
86   */
87  public class MonitorBasedPriorityBlockingQueue<E> extends AbstractQueue<E>
88      implements BlockingQueue<E> {
89  
90      // Based on revision 1.55 of PriorityBlockingQueue by Doug Lea, from
91      // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
92  
93      private static final long serialVersionUID = 5595510919245408276L;
94  
95      final PriorityQueue<E> q;
96      final Monitor monitor = new Monitor(true);
97      private final Monitor.Guard notEmpty =
98          new Monitor.Guard(monitor) {
99              @Override public boolean isSatisfied() {
100               return !q.isEmpty();
101             }
102         };
103 
104     /**
105      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the default
106      * initial capacity (11) that orders its elements according to
107      * their {@linkplain Comparable natural ordering}.
108      */
109     public MonitorBasedPriorityBlockingQueue() {
110         q = new PriorityQueue<E>();
111     }
112 
113     /**
114      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified
115      * initial capacity that orders its elements according to their
116      * {@linkplain Comparable natural ordering}.
117      *
118      * @param initialCapacity the initial capacity for this priority queue
119      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
120      *         than 1
121      */
122     public MonitorBasedPriorityBlockingQueue(int initialCapacity) {
123         q = new PriorityQueue<E>(initialCapacity, null);
124     }
125 
126     /**
127      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial
128      * capacity that orders its elements according to the specified
129      * comparator.
130      *
131      * @param initialCapacity the initial capacity for this priority queue
132      * @param  comparator the comparator that will be used to order this
133      *         priority queue.  If {@code null}, the {@linkplain Comparable
134      *         natural ordering} of the elements will be used.
135      * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
136      *         than 1
137      */
138     public MonitorBasedPriorityBlockingQueue(int initialCapacity,
139                                  @Nullable Comparator<? super E> comparator) {
140         q = new PriorityQueue<E>(initialCapacity, comparator);
141     }
142 
143     /**
144      * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> containing the elements
145      * in the specified collection.  If the specified collection is a
146      * {@link SortedSet} or a {@link PriorityQueue},  this
147      * priority queue will be ordered according to the same ordering.
148      * Otherwise, this priority queue will be ordered according to the
149      * {@linkplain Comparable natural ordering} of its elements.
150      *
151      * @param  c the collection whose elements are to be placed
152      *         into this priority queue
153      * @throws ClassCastException if elements of the specified collection
154      *         cannot be compared to one another according to the priority
155      *         queue's ordering
156      * @throws NullPointerException if the specified collection or any
157      *         of its elements are null
158      */
159     public MonitorBasedPriorityBlockingQueue(Collection<? extends E> c) {
160         q = new PriorityQueue<E>(c);
161     }
162 
163     /**
164      * Inserts the specified element into this priority queue.
165      *
166      * @param e the element to add
167      * @return <tt>true</tt> (as specified by {@link Collection#add})
168      * @throws ClassCastException if the specified element cannot be compared
169      *         with elements currently in the priority queue according to the
170      *         priority queue's ordering
171      * @throws NullPointerException if the specified element is null
172      */
173     @Override public boolean add(E e) {
174         return offer(e);
175     }
176 
177     /**
178      * Inserts the specified element into this priority queue.
179      *
180      * @param e the element to add
181      * @return <tt>true</tt> (as specified by {@link Queue#offer})
182      * @throws ClassCastException if the specified element cannot be compared
183      *         with elements currently in the priority queue according to the
184      *         priority queue's ordering
185      * @throws NullPointerException if the specified element is null
186      */
187     @Override
188     public boolean offer(E e) {
189         final Monitor monitor = this.monitor;
190         monitor.enter();
191         try {
192             boolean ok = q.offer(e);
193             if (!ok) {
194               throw new AssertionError();
195             }
196             return true;
197         } finally {
198             monitor.leave();
199         }
200     }
201 
202     /**
203      * Inserts the specified element into this priority queue. As the queue is
204      * unbounded this method will never block.
205      *
206      * @param e the element to add
207      * @throws ClassCastException if the specified element cannot be compared
208      *         with elements currently in the priority queue according to the
209      *         priority queue's ordering
210      * @throws NullPointerException if the specified element is null
211      */
212     @Override
213     public void put(E e) {
214         offer(e); // never need to block
215     }
216 
217     /**
218      * Inserts the specified element into this priority queue. As the queue is
219      * unbounded this method will never block.
220      *
221      * @param e the element to add
222      * @param timeout This parameter is ignored as the method never blocks
223      * @param unit This parameter is ignored as the method never blocks
224      * @return <tt>true</tt>
225      * @throws ClassCastException if the specified element cannot be compared
226      *         with elements currently in the priority queue according to the
227      *         priority queue's ordering
228      * @throws NullPointerException if the specified element is null
229      */
230     @Override
231     public boolean offer(E e, long timeout, TimeUnit unit) {
232         checkNotNull(unit);
233         return offer(e); // never need to block
234     }
235 
236     @Override
237     public E poll() {
238         final Monitor monitor = this.monitor;
239         monitor.enter();
240         try {
241             return q.poll();
242         } finally {
243             monitor.leave();
244         }
245     }
246 
247     @Override
248     public E take() throws InterruptedException {
249         final Monitor monitor = this.monitor;
250         monitor.enterWhen(notEmpty);
251         try {
252             return q.poll();
253         } finally {
254             monitor.leave();
255         }
256     }
257 
258     @Override
259     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
260         final Monitor monitor = this.monitor;
261         if (monitor.enterWhen(notEmpty, timeout, unit)) {
262             try {
263                 return q.poll();
264             } finally {
265                 monitor.leave();
266             }
267         } else {
268             return null;
269         }
270     }
271 
272     @Override
273     public E peek() {
274         final Monitor monitor = this.monitor;
275         monitor.enter();
276         try {
277             return q.peek();
278         } finally {
279             monitor.leave();
280         }
281     }
282 
283     /**
284      * Returns the comparator used to order the elements in this queue,
285      * or <tt>null</tt> if this queue uses the {@linkplain Comparable
286      * natural ordering} of its elements.
287      *
288      * @return the comparator used to order the elements in this queue,
289      *         or <tt>null</tt> if this queue uses the natural
290      *         ordering of its elements
291      */
292     public Comparator<? super E> comparator() {
293         return q.comparator();
294     }
295 
296     @Override public int size() {
297         final Monitor monitor = this.monitor;
298         monitor.enter();
299         try {
300             return q.size();
301         } finally {
302             monitor.leave();
303         }
304     }
305 
306     /**
307      * Always returns <tt>Integer.MAX_VALUE</tt> because
308      * a <tt>MonitorBasedPriorityBlockingQueue</tt> is not capacity constrained.
309      * @return <tt>Integer.MAX_VALUE</tt>
310      */
311     @Override
312     public int remainingCapacity() {
313         return Integer.MAX_VALUE;
314     }
315 
316     /**
317      * Removes a single instance of the specified element from this queue,
318      * if it is present.  More formally, removes an element {@code e} such
319      * that {@code o.equals(e)}, if this queue contains one or more such
320      * elements.  Returns {@code true} if and only if this queue contained
321      * the specified element (or equivalently, if this queue changed as a
322      * result of the call).
323      *
324      * @param o element to be removed from this queue, if present
325      * @return <tt>true</tt> if this queue changed as a result of the call
326      */
327     @Override public boolean remove(@Nullable Object o) {
328         final Monitor monitor = this.monitor;
329         monitor.enter();
330         try {
331             return q.remove(o);
332         } finally {
333             monitor.leave();
334         }
335     }
336 
337     /**
338      * Returns {@code true} if this queue contains the specified element.
339      * More formally, returns {@code true} if and only if this queue contains
340      * at least one element {@code e} such that {@code o.equals(e)}.
341      *
342      * @param o object to be checked for containment in this queue
343      * @return <tt>true</tt> if this queue contains the specified element
344      */
345     @Override public boolean contains(@Nullable Object o) {
346         final Monitor monitor = this.monitor;
347         monitor.enter();
348         try {
349             return q.contains(o);
350         } finally {
351             monitor.leave();
352         }
353     }
354 
355     /**
356      * Returns an array containing all of the elements in this queue.
357      * The returned array elements are in no particular order.
358      *
359      * <p>The returned array will be "safe" in that no references to it are
360      * maintained by this queue.  (In other words, this method must allocate
361      * a new array).  The caller is thus free to modify the returned array.
362      *
363      * <p>This method acts as bridge between array-based and collection-based
364      * APIs.
365      *
366      * @return an array containing all of the elements in this queue
367      */
368     @Override public Object[] toArray() {
369         final Monitor monitor = this.monitor;
370         monitor.enter();
371         try {
372             return q.toArray();
373         } finally {
374             monitor.leave();
375         }
376     }
377 
378     @Override public String toString() {
379         final Monitor monitor = this.monitor;
380         monitor.enter();
381         try {
382             return q.toString();
383         } finally {
384             monitor.leave();
385         }
386     }
387 
388     /**
389      * @throws UnsupportedOperationException {@inheritDoc}
390      * @throws ClassCastException            {@inheritDoc}
391      * @throws NullPointerException          {@inheritDoc}
392      * @throws IllegalArgumentException      {@inheritDoc}
393      */
394     @Override
395     public int drainTo(Collection<? super E> c) {
396         if (c == null)
397             throw new NullPointerException();
398         if (c == this)
399             throw new IllegalArgumentException();
400         final Monitor monitor = this.monitor;
401         monitor.enter();
402         try {
403             int n = 0;
404             E e;
405             while ( (e = q.poll()) != null) {
406                 c.add(e);
407                 ++n;
408             }
409             return n;
410         } finally {
411             monitor.leave();
412         }
413     }
414 
415     /**
416      * @throws UnsupportedOperationException {@inheritDoc}
417      * @throws ClassCastException            {@inheritDoc}
418      * @throws NullPointerException          {@inheritDoc}
419      * @throws IllegalArgumentException      {@inheritDoc}
420      */
421     @Override
422     public int drainTo(Collection<? super E> c, int maxElements) {
423         if (c == null)
424             throw new NullPointerException();
425         if (c == this)
426             throw new IllegalArgumentException();
427         if (maxElements <= 0)
428             return 0;
429         final Monitor monitor = this.monitor;
430         monitor.enter();
431         try {
432             int n = 0;
433             E e;
434             while (n < maxElements && (e = q.poll()) != null) {
435                 c.add(e);
436                 ++n;
437             }
438             return n;
439         } finally {
440             monitor.leave();
441         }
442     }
443 
444     /**
445      * Atomically removes all of the elements from this queue.
446      * The queue will be empty after this call returns.
447      */
448     @Override public void clear() {
449         final Monitor monitor = this.monitor;
450         monitor.enter();
451         try {
452             q.clear();
453         } finally {
454             monitor.leave();
455         }
456     }
457 
458     /**
459      * Returns an array containing all of the elements in this queue; the
460      * runtime type of the returned array is that of the specified array.
461      * The returned array elements are in no particular order.
462      * If the queue fits in the specified array, it is returned therein.
463      * Otherwise, a new array is allocated with the runtime type of the
464      * specified array and the size of this queue.
465      *
466      * <p>If this queue fits in the specified array with room to spare
467      * (i.e., the array has more elements than this queue), the element in
468      * the array immediately following the end of the queue is set to
469      * <tt>null</tt>.
470      *
471      * <p>Like the {@link #toArray()} method, this method acts as bridge between
472      * array-based and collection-based APIs.  Further, this method allows
473      * precise control over the runtime type of the output array, and may,
474      * under certain circumstances, be used to save allocation costs.
475      *
476      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
477      * The following code can be used to dump the queue into a newly
478      * allocated array of <tt>String</tt>:
479      *
480      * <pre>
481      *     String[] y = x.toArray(new String[0]);</pre>
482      *
483      * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
484      * <tt>toArray()</tt>.
485      *
486      * @param a the array into which the elements of the queue are to
487      *          be stored, if it is big enough; otherwise, a new array of the
488      *          same runtime type is allocated for this purpose
489      * @return an array containing all of the elements in this queue
490      * @throws ArrayStoreException if the runtime type of the specified array
491      *         is not a supertype of the runtime type of every element in
492      *         this queue
493      * @throws NullPointerException if the specified array is null
494      */
495     @Override public <T> T[] toArray(T[] a) {
496         final Monitor monitor = this.monitor;
497         monitor.enter();
498         try {
499             return q.toArray(a);
500         } finally {
501             monitor.leave();
502         }
503     }
504 
505     /**
506      * Returns an iterator over the elements in this queue. The
507      * iterator does not return the elements in any particular order.
508      * The returned <tt>Iterator</tt> is a "weakly consistent"
509      * iterator that will never throw {@link
510      * ConcurrentModificationException}, and guarantees to traverse
511      * elements as they existed upon construction of the iterator, and
512      * may (but is not guaranteed to) reflect any modifications
513      * subsequent to construction.
514      *
515      * @return an iterator over the elements in this queue
516      */
517     @Override public Iterator<E> iterator() {
518         return new Itr(toArray());
519     }
520 
521     /**
522      * Snapshot iterator that works off copy of underlying q array.
523      */
524     private class Itr implements Iterator<E> {
525         final Object[] array; // Array of all elements
526         int cursor;           // index of next element to return;
527         int lastRet;          // index of last element, or -1 if no such
528 
529         Itr(Object[] array) {
530             lastRet = -1;
531             this.array = array;
532         }
533 
534         @Override
535         public boolean hasNext() {
536             return cursor < array.length;
537         }
538 
539         @Override
540         public E next() {
541             if (cursor >= array.length)
542                 throw new NoSuchElementException();
543             lastRet = cursor;
544 
545             // array comes from q.toArray() and so should have only E's in it
546             @SuppressWarnings("unchecked")
547             E e = (E) array[cursor++];
548             return e;
549         }
550 
551         @Override
552         public void remove() {
553             if (lastRet < 0)
554                 throw new IllegalStateException();
555             Object x = array[lastRet];
556             lastRet = -1;
557             // Traverse underlying queue to find == element,
558             // not just a .equals element.
559             monitor.enter();
560             try {
561                 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
562                     if (it.next() == x) {
563                         it.remove();
564                         return;
565                     }
566                 }
567             } finally {
568                 monitor.leave();
569             }
570         }
571     }
572 
573     /**
574      * Saves the state to a stream (that is, serializes it).  This
575      * merely wraps default serialization within the monitor.  The
576      * serialization strategy for items is left to underlying
577      * Queue. Note that locking is not needed on deserialization, so
578      * readObject is not defined, just relying on default.
579      */
580     private void writeObject(java.io.ObjectOutputStream s)
581         throws java.io.IOException {
582         monitor.enter();
583         try {
584             s.defaultWriteObject();
585         } finally {
586             monitor.leave();
587         }
588     }
589 
590 }