View Javadoc
1   /*
2    * Copyright (C) 2011 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static java.util.concurrent.TimeUnit.NANOSECONDS;
20  
21  import com.google.common.annotations.Beta;
22  import com.google.common.base.Preconditions;
23  
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.CancellationException;
26  import java.util.concurrent.CountDownLatch;
27  import java.util.concurrent.ExecutionException;
28  import java.util.concurrent.Future;
29  import java.util.concurrent.Semaphore;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.TimeoutException;
32  
33  /**
34   * Utilities for treating interruptible operations as uninterruptible.
35   * In all cases, if a thread is interrupted during such a call, the call
36   * continues to block until the result is available or the timeout elapses,
37   * and only then re-interrupts the thread.
38   *
39   * @author Anthony Zana
40   * @since 10.0
41   */
42  @Beta
43  public final class Uninterruptibles {
44  
45    // Implementation Note: As of 3-7-11, the logic for each blocking/timeout
46    // methods is identical, save for method being invoked.
47  
48    /**
49     * Invokes {@code latch.}{@link CountDownLatch#await() await()}
50     * uninterruptibly.
51     */
52    public static void awaitUninterruptibly(CountDownLatch latch) {
53      boolean interrupted = false;
54      try {
55        while (true) {
56          try {
57            latch.await();
58            return;
59          } catch (InterruptedException e) {
60            interrupted = true;
61          }
62        }
63      } finally {
64        if (interrupted) {
65          Thread.currentThread().interrupt();
66        }
67      }
68    }
69  
70    /**
71     * Invokes
72     * {@code latch.}{@link CountDownLatch#await(long, TimeUnit)
73     * await(timeout, unit)} uninterruptibly.
74     */
75    public static boolean awaitUninterruptibly(CountDownLatch latch,
76        long timeout, TimeUnit unit) {
77      boolean interrupted = false;
78      try {
79        long remainingNanos = unit.toNanos(timeout);
80        long end = System.nanoTime() + remainingNanos;
81  
82        while (true) {
83          try {
84            // CountDownLatch treats negative timeouts just like zero.
85            return latch.await(remainingNanos, NANOSECONDS);
86          } catch (InterruptedException e) {
87            interrupted = true;
88            remainingNanos = end - System.nanoTime();
89          }
90        }
91      } finally {
92        if (interrupted) {
93          Thread.currentThread().interrupt();
94        }
95      }
96    }
97  
98    /**
99     * Invokes {@code toJoin.}{@link Thread#join() join()} uninterruptibly.
100    */
101   public static void joinUninterruptibly(Thread toJoin) {
102     boolean interrupted = false;
103     try {
104       while (true) {
105         try {
106           toJoin.join();
107           return;
108         } catch (InterruptedException e) {
109           interrupted = true;
110         }
111       }
112     } finally {
113       if (interrupted) {
114         Thread.currentThread().interrupt();
115       }
116     }
117   }
118 
119   /**
120    * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
121    * To get uninterruptibility and remove checked exceptions, see
122    * {@link Futures#getUnchecked}.
123    *
124    * <p>If instead, you wish to treat {@link InterruptedException} uniformly
125    * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
126    * or {@link Futures#makeChecked}.
127    *
128    * @throws ExecutionException if the computation threw an exception
129    * @throws CancellationException if the computation was cancelled
130    */
131   public static <V> V getUninterruptibly(Future<V> future)
132       throws ExecutionException {
133     boolean interrupted = false;
134     try {
135       while (true) {
136         try {
137           return future.get();
138         } catch (InterruptedException e) {
139           interrupted = true;
140         }
141       }
142     } finally {
143       if (interrupted) {
144         Thread.currentThread().interrupt();
145       }
146     }
147   }
148 
149   /**
150    * Invokes
151    * {@code future.}{@link Future#get(long, TimeUnit) get(timeout, unit)}
152    * uninterruptibly.
153    *
154    * <p>If instead, you wish to treat {@link InterruptedException} uniformly
155    * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
156    * or {@link Futures#makeChecked}.
157    *
158    * @throws ExecutionException if the computation threw an exception
159    * @throws CancellationException if the computation was cancelled
160    * @throws TimeoutException if the wait timed out
161    */
162   public static <V> V getUninterruptibly(
163       Future<V> future, long timeout,  TimeUnit unit)
164           throws ExecutionException, TimeoutException {
165     boolean interrupted = false;
166     try {
167       long remainingNanos = unit.toNanos(timeout);
168       long end = System.nanoTime() + remainingNanos;
169 
170       while (true) {
171         try {
172           // Future treats negative timeouts just like zero.
173           return future.get(remainingNanos, NANOSECONDS);
174         } catch (InterruptedException e) {
175           interrupted = true;
176           remainingNanos = end - System.nanoTime();
177         }
178       }
179     } finally {
180       if (interrupted) {
181         Thread.currentThread().interrupt();
182       }
183     }
184   }
185 
186   /**
187    * Invokes
188    * {@code unit.}{@link TimeUnit#timedJoin(Thread, long)
189    * timedJoin(toJoin, timeout)} uninterruptibly.
190    */
191   public static void joinUninterruptibly(Thread toJoin,
192       long timeout, TimeUnit unit) {
193     Preconditions.checkNotNull(toJoin);
194     boolean interrupted = false;
195     try {
196       long remainingNanos = unit.toNanos(timeout);
197       long end = System.nanoTime() + remainingNanos;
198       while (true) {
199         try {
200           // TimeUnit.timedJoin() treats negative timeouts just like zero.
201           NANOSECONDS.timedJoin(toJoin, remainingNanos);
202           return;
203         } catch (InterruptedException e) {
204           interrupted = true;
205           remainingNanos = end - System.nanoTime();
206         }
207       }
208     } finally {
209       if (interrupted) {
210         Thread.currentThread().interrupt();
211       }
212     }
213   }
214 
215   /**
216    * Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
217    */
218   public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
219     boolean interrupted = false;
220     try {
221       while (true) {
222         try {
223           return queue.take();
224         } catch (InterruptedException e) {
225           interrupted = true;
226         }
227       }
228     } finally {
229       if (interrupted) {
230         Thread.currentThread().interrupt();
231       }
232     }
233   }
234 
235   /**
236    * Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)}
237    * uninterruptibly.
238    *
239    * @throws ClassCastException if the class of the specified element prevents
240    *     it from being added to the given queue
241    * @throws IllegalArgumentException if some property of the specified element
242    *     prevents it from being added to the given queue
243    */
244   public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
245     boolean interrupted = false;
246     try {
247       while (true) {
248         try {
249           queue.put(element);
250           return;
251         } catch (InterruptedException e) {
252           interrupted = true;
253         }
254       }
255     } finally {
256       if (interrupted) {
257         Thread.currentThread().interrupt();
258       }
259     }
260   }
261 
262   // TODO(user): Support Sleeper somehow (wrapper or interface method)?
263   /**
264    * Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)}
265    * uninterruptibly.
266    */
267   public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
268     boolean interrupted = false;
269     try {
270       long remainingNanos = unit.toNanos(sleepFor);
271       long end = System.nanoTime() + remainingNanos;
272       while (true) {
273         try {
274           // TimeUnit.sleep() treats negative timeouts just like zero.
275           NANOSECONDS.sleep(remainingNanos);
276           return;
277         } catch (InterruptedException e) {
278           interrupted = true;
279           remainingNanos = end - System.nanoTime();
280         }
281       }
282     } finally {
283       if (interrupted) {
284         Thread.currentThread().interrupt();
285       }
286     }
287   }
288 
289   /**
290    * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit)
291    * tryAcquire(1, timeout, unit)} uninterruptibly.
292    *
293    * @since 18.0
294    */
295   public static boolean tryAcquireUninterruptibly(
296       Semaphore semaphore, long timeout, TimeUnit unit) {
297     return tryAcquireUninterruptibly(semaphore, 1, timeout, unit);
298   }
299 
300   /**
301    * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit)
302    * tryAcquire(permits, timeout, unit)} uninterruptibly.
303    *
304    * @since 18.0
305    */
306   public static boolean tryAcquireUninterruptibly(
307       Semaphore semaphore, int permits, long timeout, TimeUnit unit) {
308     boolean interrupted = false;
309     try {
310       long remainingNanos = unit.toNanos(timeout);
311       long end = System.nanoTime() + remainingNanos;
312 
313       while (true) {
314         try {
315           // Semaphore treats negative timeouts just like zero.
316           return semaphore.tryAcquire(permits, remainingNanos, NANOSECONDS);
317         } catch (InterruptedException e) {
318           interrupted = true;
319           remainingNanos = end - System.nanoTime();
320         }
321       }
322     } finally {
323       if (interrupted) {
324         Thread.currentThread().interrupt();
325       }
326     }
327   }
328 
329   // TODO(user): Add support for waitUninterruptibly.
330 
331   private Uninterruptibles() {}
332 }