1 /*
2 * Copyright (C) 2010 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkNotNull;
20
21 import java.util.AbstractQueue;
22 import java.util.Collection;
23 import java.util.Comparator;
24 import java.util.ConcurrentModificationException;
25 import java.util.Iterator;
26 import java.util.NoSuchElementException;
27 import java.util.PriorityQueue;
28 import java.util.Queue;
29 import java.util.SortedSet;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.TimeUnit;
32
33 import javax.annotation.Nullable;
34
35 /**
36 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
37 * the same ordering rules as class {@link PriorityQueue} and supplies
38 * blocking retrieval operations. While this queue is logically
39 * unbounded, attempted additions may fail due to resource exhaustion
40 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
41 * <tt>null</tt> elements. A priority queue relying on {@linkplain
42 * Comparable natural ordering} also does not permit insertion of
43 * non-comparable objects (doing so results in
44 * <tt>ClassCastException</tt>).
45 *
46 * <p>This class and its iterator implement all of the
47 * <em>optional</em> methods of the {@link Collection} and {@link
48 * Iterator} interfaces. The Iterator provided in method {@link
49 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
50 * the MonitorBasedPriorityBlockingQueue in any particular order. If you need
51 * ordered traversal, consider using
52 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
53 * can be used to <em>remove</em> some or all elements in priority
54 * order and place them in another collection.
55 *
56 * <p>Operations on this class make no guarantees about the ordering
57 * of elements with equal priority. If you need to enforce an
58 * ordering, you can define custom classes or comparators that use a
59 * secondary key to break ties in primary priority values. For
60 * example, here is a class that applies first-in-first-out
61 * tie-breaking to comparable elements. To use it, you would insert a
62 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
63 *
64 * <pre>
65 * class FIFOEntry<E extends Comparable<? super E>>
66 * implements Comparable<FIFOEntry<E>> {
67 * final static AtomicLong seq = new AtomicLong();
68 * final long seqNum;
69 * final E entry;
70 * public FIFOEntry(E entry) {
71 * seqNum = seq.getAndIncrement();
72 * this.entry = entry;
73 * }
74 * public E getEntry() { return entry; }
75 * public int compareTo(FIFOEntry<E> other) {
76 * int res = entry.compareTo(other.entry);
77 * if (res == 0 && other.entry != this.entry)
78 * res = (seqNum < other.seqNum ? -1 : 1);
79 * return res;
80 * }
81 * }</pre>
82 *
83 * @author Doug Lea
84 * @author Justin T. Sampson
85 * @param <E> the type of elements held in this collection
86 */
87 public class MonitorBasedPriorityBlockingQueue<E> extends AbstractQueue<E>
88 implements BlockingQueue<E> {
89
90 // Based on revision 1.55 of PriorityBlockingQueue by Doug Lea, from
91 // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/
92
93 private static final long serialVersionUID = 5595510919245408276L;
94
95 final PriorityQueue<E> q;
96 final Monitor monitor = new Monitor(true);
97 private final Monitor.Guard notEmpty =
98 new Monitor.Guard(monitor) {
99 @Override public boolean isSatisfied() {
100 return !q.isEmpty();
101 }
102 };
103
104 /**
105 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the default
106 * initial capacity (11) that orders its elements according to
107 * their {@linkplain Comparable natural ordering}.
108 */
109 public MonitorBasedPriorityBlockingQueue() {
110 q = new PriorityQueue<E>();
111 }
112
113 /**
114 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified
115 * initial capacity that orders its elements according to their
116 * {@linkplain Comparable natural ordering}.
117 *
118 * @param initialCapacity the initial capacity for this priority queue
119 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
120 * than 1
121 */
122 public MonitorBasedPriorityBlockingQueue(int initialCapacity) {
123 q = new PriorityQueue<E>(initialCapacity, null);
124 }
125
126 /**
127 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> with the specified initial
128 * capacity that orders its elements according to the specified
129 * comparator.
130 *
131 * @param initialCapacity the initial capacity for this priority queue
132 * @param comparator the comparator that will be used to order this
133 * priority queue. If {@code null}, the {@linkplain Comparable
134 * natural ordering} of the elements will be used.
135 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
136 * than 1
137 */
138 public MonitorBasedPriorityBlockingQueue(int initialCapacity,
139 @Nullable Comparator<? super E> comparator) {
140 q = new PriorityQueue<E>(initialCapacity, comparator);
141 }
142
143 /**
144 * Creates a <tt>MonitorBasedPriorityBlockingQueue</tt> containing the elements
145 * in the specified collection. If the specified collection is a
146 * {@link SortedSet} or a {@link PriorityQueue}, this
147 * priority queue will be ordered according to the same ordering.
148 * Otherwise, this priority queue will be ordered according to the
149 * {@linkplain Comparable natural ordering} of its elements.
150 *
151 * @param c the collection whose elements are to be placed
152 * into this priority queue
153 * @throws ClassCastException if elements of the specified collection
154 * cannot be compared to one another according to the priority
155 * queue's ordering
156 * @throws NullPointerException if the specified collection or any
157 * of its elements are null
158 */
159 public MonitorBasedPriorityBlockingQueue(Collection<? extends E> c) {
160 q = new PriorityQueue<E>(c);
161 }
162
163 /**
164 * Inserts the specified element into this priority queue.
165 *
166 * @param e the element to add
167 * @return <tt>true</tt> (as specified by {@link Collection#add})
168 * @throws ClassCastException if the specified element cannot be compared
169 * with elements currently in the priority queue according to the
170 * priority queue's ordering
171 * @throws NullPointerException if the specified element is null
172 */
173 @Override public boolean add(E e) {
174 return offer(e);
175 }
176
177 /**
178 * Inserts the specified element into this priority queue.
179 *
180 * @param e the element to add
181 * @return <tt>true</tt> (as specified by {@link Queue#offer})
182 * @throws ClassCastException if the specified element cannot be compared
183 * with elements currently in the priority queue according to the
184 * priority queue's ordering
185 * @throws NullPointerException if the specified element is null
186 */
187 @Override
188 public boolean offer(E e) {
189 final Monitor monitor = this.monitor;
190 monitor.enter();
191 try {
192 boolean ok = q.offer(e);
193 if (!ok) {
194 throw new AssertionError();
195 }
196 return true;
197 } finally {
198 monitor.leave();
199 }
200 }
201
202 /**
203 * Inserts the specified element into this priority queue. As the queue is
204 * unbounded this method will never block.
205 *
206 * @param e the element to add
207 * @throws ClassCastException if the specified element cannot be compared
208 * with elements currently in the priority queue according to the
209 * priority queue's ordering
210 * @throws NullPointerException if the specified element is null
211 */
212 @Override
213 public void put(E e) {
214 offer(e); // never need to block
215 }
216
217 /**
218 * Inserts the specified element into this priority queue. As the queue is
219 * unbounded this method will never block.
220 *
221 * @param e the element to add
222 * @param timeout This parameter is ignored as the method never blocks
223 * @param unit This parameter is ignored as the method never blocks
224 * @return <tt>true</tt>
225 * @throws ClassCastException if the specified element cannot be compared
226 * with elements currently in the priority queue according to the
227 * priority queue's ordering
228 * @throws NullPointerException if the specified element is null
229 */
230 @Override
231 public boolean offer(E e, long timeout, TimeUnit unit) {
232 checkNotNull(unit);
233 return offer(e); // never need to block
234 }
235
236 @Override
237 public E poll() {
238 final Monitor monitor = this.monitor;
239 monitor.enter();
240 try {
241 return q.poll();
242 } finally {
243 monitor.leave();
244 }
245 }
246
247 @Override
248 public E take() throws InterruptedException {
249 final Monitor monitor = this.monitor;
250 monitor.enterWhen(notEmpty);
251 try {
252 return q.poll();
253 } finally {
254 monitor.leave();
255 }
256 }
257
258 @Override
259 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
260 final Monitor monitor = this.monitor;
261 if (monitor.enterWhen(notEmpty, timeout, unit)) {
262 try {
263 return q.poll();
264 } finally {
265 monitor.leave();
266 }
267 } else {
268 return null;
269 }
270 }
271
272 @Override
273 public E peek() {
274 final Monitor monitor = this.monitor;
275 monitor.enter();
276 try {
277 return q.peek();
278 } finally {
279 monitor.leave();
280 }
281 }
282
283 /**
284 * Returns the comparator used to order the elements in this queue,
285 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
286 * natural ordering} of its elements.
287 *
288 * @return the comparator used to order the elements in this queue,
289 * or <tt>null</tt> if this queue uses the natural
290 * ordering of its elements
291 */
292 public Comparator<? super E> comparator() {
293 return q.comparator();
294 }
295
296 @Override public int size() {
297 final Monitor monitor = this.monitor;
298 monitor.enter();
299 try {
300 return q.size();
301 } finally {
302 monitor.leave();
303 }
304 }
305
306 /**
307 * Always returns <tt>Integer.MAX_VALUE</tt> because
308 * a <tt>MonitorBasedPriorityBlockingQueue</tt> is not capacity constrained.
309 * @return <tt>Integer.MAX_VALUE</tt>
310 */
311 @Override
312 public int remainingCapacity() {
313 return Integer.MAX_VALUE;
314 }
315
316 /**
317 * Removes a single instance of the specified element from this queue,
318 * if it is present. More formally, removes an element {@code e} such
319 * that {@code o.equals(e)}, if this queue contains one or more such
320 * elements. Returns {@code true} if and only if this queue contained
321 * the specified element (or equivalently, if this queue changed as a
322 * result of the call).
323 *
324 * @param o element to be removed from this queue, if present
325 * @return <tt>true</tt> if this queue changed as a result of the call
326 */
327 @Override public boolean remove(@Nullable Object o) {
328 final Monitor monitor = this.monitor;
329 monitor.enter();
330 try {
331 return q.remove(o);
332 } finally {
333 monitor.leave();
334 }
335 }
336
337 /**
338 * Returns {@code true} if this queue contains the specified element.
339 * More formally, returns {@code true} if and only if this queue contains
340 * at least one element {@code e} such that {@code o.equals(e)}.
341 *
342 * @param o object to be checked for containment in this queue
343 * @return <tt>true</tt> if this queue contains the specified element
344 */
345 @Override public boolean contains(@Nullable Object o) {
346 final Monitor monitor = this.monitor;
347 monitor.enter();
348 try {
349 return q.contains(o);
350 } finally {
351 monitor.leave();
352 }
353 }
354
355 /**
356 * Returns an array containing all of the elements in this queue.
357 * The returned array elements are in no particular order.
358 *
359 * <p>The returned array will be "safe" in that no references to it are
360 * maintained by this queue. (In other words, this method must allocate
361 * a new array). The caller is thus free to modify the returned array.
362 *
363 * <p>This method acts as bridge between array-based and collection-based
364 * APIs.
365 *
366 * @return an array containing all of the elements in this queue
367 */
368 @Override public Object[] toArray() {
369 final Monitor monitor = this.monitor;
370 monitor.enter();
371 try {
372 return q.toArray();
373 } finally {
374 monitor.leave();
375 }
376 }
377
378 @Override public String toString() {
379 final Monitor monitor = this.monitor;
380 monitor.enter();
381 try {
382 return q.toString();
383 } finally {
384 monitor.leave();
385 }
386 }
387
388 /**
389 * @throws UnsupportedOperationException {@inheritDoc}
390 * @throws ClassCastException {@inheritDoc}
391 * @throws NullPointerException {@inheritDoc}
392 * @throws IllegalArgumentException {@inheritDoc}
393 */
394 @Override
395 public int drainTo(Collection<? super E> c) {
396 if (c == null)
397 throw new NullPointerException();
398 if (c == this)
399 throw new IllegalArgumentException();
400 final Monitor monitor = this.monitor;
401 monitor.enter();
402 try {
403 int n = 0;
404 E e;
405 while ( (e = q.poll()) != null) {
406 c.add(e);
407 ++n;
408 }
409 return n;
410 } finally {
411 monitor.leave();
412 }
413 }
414
415 /**
416 * @throws UnsupportedOperationException {@inheritDoc}
417 * @throws ClassCastException {@inheritDoc}
418 * @throws NullPointerException {@inheritDoc}
419 * @throws IllegalArgumentException {@inheritDoc}
420 */
421 @Override
422 public int drainTo(Collection<? super E> c, int maxElements) {
423 if (c == null)
424 throw new NullPointerException();
425 if (c == this)
426 throw new IllegalArgumentException();
427 if (maxElements <= 0)
428 return 0;
429 final Monitor monitor = this.monitor;
430 monitor.enter();
431 try {
432 int n = 0;
433 E e;
434 while (n < maxElements && (e = q.poll()) != null) {
435 c.add(e);
436 ++n;
437 }
438 return n;
439 } finally {
440 monitor.leave();
441 }
442 }
443
444 /**
445 * Atomically removes all of the elements from this queue.
446 * The queue will be empty after this call returns.
447 */
448 @Override public void clear() {
449 final Monitor monitor = this.monitor;
450 monitor.enter();
451 try {
452 q.clear();
453 } finally {
454 monitor.leave();
455 }
456 }
457
458 /**
459 * Returns an array containing all of the elements in this queue; the
460 * runtime type of the returned array is that of the specified array.
461 * The returned array elements are in no particular order.
462 * If the queue fits in the specified array, it is returned therein.
463 * Otherwise, a new array is allocated with the runtime type of the
464 * specified array and the size of this queue.
465 *
466 * <p>If this queue fits in the specified array with room to spare
467 * (i.e., the array has more elements than this queue), the element in
468 * the array immediately following the end of the queue is set to
469 * <tt>null</tt>.
470 *
471 * <p>Like the {@link #toArray()} method, this method acts as bridge between
472 * array-based and collection-based APIs. Further, this method allows
473 * precise control over the runtime type of the output array, and may,
474 * under certain circumstances, be used to save allocation costs.
475 *
476 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
477 * The following code can be used to dump the queue into a newly
478 * allocated array of <tt>String</tt>:
479 *
480 * <pre>
481 * String[] y = x.toArray(new String[0]);</pre>
482 *
483 * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to
484 * <tt>toArray()</tt>.
485 *
486 * @param a the array into which the elements of the queue are to
487 * be stored, if it is big enough; otherwise, a new array of the
488 * same runtime type is allocated for this purpose
489 * @return an array containing all of the elements in this queue
490 * @throws ArrayStoreException if the runtime type of the specified array
491 * is not a supertype of the runtime type of every element in
492 * this queue
493 * @throws NullPointerException if the specified array is null
494 */
495 @Override public <T> T[] toArray(T[] a) {
496 final Monitor monitor = this.monitor;
497 monitor.enter();
498 try {
499 return q.toArray(a);
500 } finally {
501 monitor.leave();
502 }
503 }
504
505 /**
506 * Returns an iterator over the elements in this queue. The
507 * iterator does not return the elements in any particular order.
508 * The returned <tt>Iterator</tt> is a "weakly consistent"
509 * iterator that will never throw {@link
510 * ConcurrentModificationException}, and guarantees to traverse
511 * elements as they existed upon construction of the iterator, and
512 * may (but is not guaranteed to) reflect any modifications
513 * subsequent to construction.
514 *
515 * @return an iterator over the elements in this queue
516 */
517 @Override public Iterator<E> iterator() {
518 return new Itr(toArray());
519 }
520
521 /**
522 * Snapshot iterator that works off copy of underlying q array.
523 */
524 private class Itr implements Iterator<E> {
525 final Object[] array; // Array of all elements
526 int cursor; // index of next element to return;
527 int lastRet; // index of last element, or -1 if no such
528
529 Itr(Object[] array) {
530 lastRet = -1;
531 this.array = array;
532 }
533
534 @Override
535 public boolean hasNext() {
536 return cursor < array.length;
537 }
538
539 @Override
540 public E next() {
541 if (cursor >= array.length)
542 throw new NoSuchElementException();
543 lastRet = cursor;
544
545 // array comes from q.toArray() and so should have only E's in it
546 @SuppressWarnings("unchecked")
547 E e = (E) array[cursor++];
548 return e;
549 }
550
551 @Override
552 public void remove() {
553 if (lastRet < 0)
554 throw new IllegalStateException();
555 Object x = array[lastRet];
556 lastRet = -1;
557 // Traverse underlying queue to find == element,
558 // not just a .equals element.
559 monitor.enter();
560 try {
561 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
562 if (it.next() == x) {
563 it.remove();
564 return;
565 }
566 }
567 } finally {
568 monitor.leave();
569 }
570 }
571 }
572
573 /**
574 * Saves the state to a stream (that is, serializes it). This
575 * merely wraps default serialization within the monitor. The
576 * serialization strategy for items is left to underlying
577 * Queue. Note that locking is not needed on deserialization, so
578 * readObject is not defined, just relying on default.
579 */
580 private void writeObject(java.io.ObjectOutputStream s)
581 throws java.io.IOException {
582 monitor.enter();
583 try {
584 s.defaultWriteObject();
585 } finally {
586 monitor.leave();
587 }
588 }
589
590 }