View Javadoc
1   /*
2    * Copyright (C) 2009 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.base.Preconditions.checkArgument;
20  import static com.google.common.base.Preconditions.checkNotNull;
21  import static com.google.common.base.Preconditions.checkState;
22  import static com.google.common.util.concurrent.Service.State.FAILED;
23  import static com.google.common.util.concurrent.Service.State.NEW;
24  import static com.google.common.util.concurrent.Service.State.RUNNING;
25  import static com.google.common.util.concurrent.Service.State.STARTING;
26  import static com.google.common.util.concurrent.Service.State.STOPPING;
27  import static com.google.common.util.concurrent.Service.State.TERMINATED;
28  
29  import com.google.common.annotations.Beta;
30  import com.google.common.util.concurrent.ListenerCallQueue.Callback;
31  import com.google.common.util.concurrent.Monitor.Guard;
32  import com.google.common.util.concurrent.Service.State; // javadoc needs this
33  
34  import java.util.ArrayList;
35  import java.util.Collections;
36  import java.util.List;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  
41  import javax.annotation.Nullable;
42  import javax.annotation.concurrent.GuardedBy;
43  import javax.annotation.concurrent.Immutable;
44  
45  /**
46   * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
47   * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
48   * callbacks. Its subclasses must manage threads manually; consider
49   * {@link AbstractExecutionThreadService} if you need only a single execution thread.
50   *
51   * @author Jesse Wilson
52   * @author Luke Sandberg
53   * @since 1.0
54   */
55  @Beta
56  public abstract class AbstractService implements Service {
57    private static final Callback<Listener> STARTING_CALLBACK =
58        new Callback<Listener>("starting()") {
59          @Override void call(Listener listener) {
60            listener.starting();
61          }
62        };
63    private static final Callback<Listener> RUNNING_CALLBACK =
64        new Callback<Listener>("running()") {
65          @Override void call(Listener listener) {
66            listener.running();
67          }
68        };
69    private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK =
70        stoppingCallback(STARTING);
71    private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK =
72        stoppingCallback(RUNNING);
73  
74    private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK =
75        terminatedCallback(NEW);
76    private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK =
77        terminatedCallback(RUNNING);
78    private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK =
79        terminatedCallback(STOPPING);
80  
81    private static Callback<Listener> terminatedCallback(final State from) {
82      return new Callback<Listener>("terminated({from = " + from + "})") {
83        @Override void call(Listener listener) {
84          listener.terminated(from);
85        }
86      };
87    }
88  
89    private static Callback<Listener> stoppingCallback(final State from) {
90      return new Callback<Listener>("stopping({from = " + from + "})") {
91        @Override void call(Listener listener) {
92          listener.stopping(from);
93        }
94      };
95    }
96  
97    private final Monitor monitor = new Monitor();
98  
99    private final Guard isStartable = new Guard(monitor) {
100     @Override public boolean isSatisfied() {
101       return state() == NEW;
102     }
103   };
104 
105   private final Guard isStoppable = new Guard(monitor) {
106     @Override public boolean isSatisfied() {
107       return state().compareTo(RUNNING) <= 0;
108     }
109   };
110 
111   private final Guard hasReachedRunning = new Guard(monitor) {
112     @Override public boolean isSatisfied() {
113       return state().compareTo(RUNNING) >= 0;
114     }
115   };
116 
117   private final Guard isStopped = new Guard(monitor) {
118     @Override public boolean isSatisfied() {
119       return state().isTerminal();
120     }
121   };
122 
123   /**
124    * The listeners to notify during a state transition.
125    */
126   @GuardedBy("monitor")
127   private final List<ListenerCallQueue<Listener>> listeners =
128       Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
129 
130   /**
131    * The current state of the service.  This should be written with the lock held but can be read
132    * without it because it is an immutable object in a volatile field.  This is desirable so that
133    * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
134    * without grabbing the lock.
135    *
136    * <p>To update this field correctly the lock must be held to guarantee that the state is
137    * consistent.
138    */
139   @GuardedBy("monitor")
140   private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
141 
142   /** Constructor for use by subclasses. */
143   protected AbstractService() {}
144 
145   /**
146    * This method is called by {@link #startAsync} to initiate service startup. The invocation of
147    * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
148    * or after it has returned. If startup fails, the invocation should cause a call to
149    * {@link #notifyFailed(Throwable)} instead.
150    *
151    * <p>This method should return promptly; prefer to do work on a different thread where it is
152    * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
153    * called multiple times.
154    */
155   protected abstract void doStart();
156 
157   /**
158    * This method should be used to initiate service shutdown. The invocation of this method should
159    * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
160    * returned. If shutdown fails, the invocation should cause a call to
161    * {@link #notifyFailed(Throwable)} instead.
162    *
163    * <p> This method should return promptly; prefer to do work on a different thread where it is
164    * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
165    * called multiple times.
166    */
167   protected abstract void doStop();
168 
169   @Override public final Service startAsync() {
170     if (monitor.enterIf(isStartable)) {
171       try {
172         snapshot = new StateSnapshot(STARTING);
173         starting();
174         doStart();
175        // TODO(user): justify why we are catching Throwable and not RuntimeException
176       } catch (Throwable startupFailure) {
177         notifyFailed(startupFailure);
178       } finally {
179         monitor.leave();
180         executeListeners();
181       }
182     } else {
183       throw new IllegalStateException("Service " + this + " has already been started");
184     }
185     return this;
186   }
187 
188   @Override public final Service stopAsync() {
189     if (monitor.enterIf(isStoppable)) {
190       try {
191         State previous = state();
192         switch (previous) {
193           case NEW:
194             snapshot = new StateSnapshot(TERMINATED);
195             terminated(NEW);
196             break;
197           case STARTING:
198             snapshot = new StateSnapshot(STARTING, true, null);
199             stopping(STARTING);
200             break;
201           case RUNNING:
202             snapshot = new StateSnapshot(STOPPING);
203             stopping(RUNNING);
204             doStop();
205             break;
206           case STOPPING:
207           case TERMINATED:
208           case FAILED:
209             // These cases are impossible due to the if statement above.
210             throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
211           default:
212             throw new AssertionError("Unexpected state: " + previous);
213         }
214         // TODO(user): justify why we are catching Throwable and not RuntimeException.  Also, we
215         // may inadvertently catch our AssertionErrors.
216       } catch (Throwable shutdownFailure) {
217         notifyFailed(shutdownFailure);
218       } finally {
219         monitor.leave();
220         executeListeners();
221       }
222     }
223     return this;
224   }
225 
226   @Override public final void awaitRunning() {
227     monitor.enterWhenUninterruptibly(hasReachedRunning);
228     try {
229       checkCurrentState(RUNNING);
230     } finally {
231       monitor.leave();
232     }
233   }
234 
235   @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
236     if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
237       try {
238         checkCurrentState(RUNNING);
239       } finally {
240         monitor.leave();
241       }
242     } else {
243       // It is possible due to races the we are currently in the expected state even though we
244       // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
245       // even check the guard.  I don't think we care too much about this use case but it could lead
246       // to a confusing error message.
247       throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. "
248           + "Current state: " + state());
249     }
250   }
251 
252   @Override public final void awaitTerminated() {
253     monitor.enterWhenUninterruptibly(isStopped);
254     try {
255       checkCurrentState(TERMINATED);
256     } finally {
257       monitor.leave();
258     }
259   }
260 
261   @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
262     if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
263       try {
264         checkCurrentState(TERMINATED);
265       } finally {
266         monitor.leave();
267       }
268     } else {
269       // It is possible due to races the we are currently in the expected state even though we
270       // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
271       // even check the guard.  I don't think we care too much about this use case but it could lead
272       // to a confusing error message.
273       throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. "
274           + "Current state: " + state());
275     }
276   }
277 
278   /** Checks that the current state is equal to the expected state. */
279   @GuardedBy("monitor")
280   private void checkCurrentState(State expected) {
281     State actual = state();
282     if (actual != expected) {
283       if (actual == FAILED) {
284         // Handle this specially so that we can include the failureCause, if there is one.
285         throw new IllegalStateException("Expected the service to be " + expected
286             + ", but the service has FAILED", failureCause());
287       }
288       throw new IllegalStateException("Expected the service to be " + expected + ", but was "
289           + actual);
290     }
291   }
292 
293   /**
294    * Implementing classes should invoke this method once their service has started. It will cause
295    * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
296    *
297    * @throws IllegalStateException if the service is not {@link State#STARTING}.
298    */
299   protected final void notifyStarted() {
300     monitor.enter();
301     try {
302       // We have to examine the internal state of the snapshot here to properly handle the stop
303       // while starting case.
304       if (snapshot.state != STARTING) {
305         IllegalStateException failure = new IllegalStateException(
306             "Cannot notifyStarted() when the service is " + snapshot.state);
307         notifyFailed(failure);
308         throw failure;
309       }
310 
311       if (snapshot.shutdownWhenStartupFinishes) {
312         snapshot = new StateSnapshot(STOPPING);
313         // We don't call listeners here because we already did that when we set the
314         // shutdownWhenStartupFinishes flag.
315         doStop();
316       } else {
317         snapshot = new StateSnapshot(RUNNING);
318         running();
319       }
320     } finally {
321       monitor.leave();
322       executeListeners();
323     }
324   }
325 
326   /**
327    * Implementing classes should invoke this method once their service has stopped. It will cause
328    * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
329    *
330    * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
331    *         {@link State#RUNNING}.
332    */
333   protected final void notifyStopped() {
334     monitor.enter();
335     try {
336       // We check the internal state of the snapshot instead of state() directly so we don't allow
337       // notifyStopped() to be called while STARTING, even if stop() has already been called.
338       State previous = snapshot.state;
339       if (previous != STOPPING && previous != RUNNING) {
340         IllegalStateException failure = new IllegalStateException(
341             "Cannot notifyStopped() when the service is " + previous);
342         notifyFailed(failure);
343         throw failure;
344       }
345       snapshot = new StateSnapshot(TERMINATED);
346       terminated(previous);
347     } finally {
348       monitor.leave();
349       executeListeners();
350     }
351   }
352 
353   /**
354    * Invoke this method to transition the service to the {@link State#FAILED}. The service will
355    * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
356    * or otherwise cannot be started nor stopped.
357    */
358   protected final void notifyFailed(Throwable cause) {
359     checkNotNull(cause);
360 
361     monitor.enter();
362     try {
363       State previous = state();
364       switch (previous) {
365         case NEW:
366         case TERMINATED:
367           throw new IllegalStateException("Failed while in state:" + previous, cause);
368         case RUNNING:
369         case STARTING:
370         case STOPPING:
371           snapshot = new StateSnapshot(FAILED, false, cause);
372           failed(previous, cause);
373           break;
374         case FAILED:
375           // Do nothing
376           break;
377         default:
378           throw new AssertionError("Unexpected state: " + previous);
379       }
380     } finally {
381       monitor.leave();
382       executeListeners();
383     }
384   }
385 
386   @Override
387   public final boolean isRunning() {
388     return state() == RUNNING;
389   }
390 
391   @Override
392   public final State state() {
393     return snapshot.externalState();
394   }
395 
396   /**
397    * @since 14.0
398    */
399   @Override
400   public final Throwable failureCause() {
401     return snapshot.failureCause();
402   }
403 
404   /**
405    * @since 13.0
406    */
407   @Override
408   public final void addListener(Listener listener, Executor executor) {
409     checkNotNull(listener, "listener");
410     checkNotNull(executor, "executor");
411     monitor.enter();
412     try {
413       if (!state().isTerminal()) {
414         listeners.add(new ListenerCallQueue<Listener>(listener, executor));
415       }
416     } finally {
417       monitor.leave();
418     }
419   }
420 
421   @Override public String toString() {
422     return getClass().getSimpleName() + " [" + state() + "]";
423   }
424 
425   /**
426    * Attempts to execute all the listeners in {@link #listeners} while not holding the
427    * {@link #monitor}.
428    */
429   private void executeListeners() {
430     if (!monitor.isOccupiedByCurrentThread()) {
431       // iterate by index to avoid concurrent modification exceptions
432       for (int i = 0; i < listeners.size(); i++) {
433         listeners.get(i).execute();
434       }
435     }
436   }
437 
438   @GuardedBy("monitor")
439   private void starting() {
440     STARTING_CALLBACK.enqueueOn(listeners);
441   }
442 
443   @GuardedBy("monitor")
444   private void running() {
445     RUNNING_CALLBACK.enqueueOn(listeners);
446   }
447 
448   @GuardedBy("monitor")
449   private void stopping(final State from) {
450     if (from == State.STARTING) {
451       STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners);
452     } else if (from == State.RUNNING) {
453       STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
454     } else {
455       throw new AssertionError();
456     }
457   }
458 
459   @GuardedBy("monitor")
460   private void terminated(final State from) {
461     switch(from) {
462       case NEW:
463         TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners);
464         break;
465       case RUNNING:
466         TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
467         break;
468       case STOPPING:
469         TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners);
470         break;
471       case STARTING:
472       case TERMINATED:
473       case FAILED:
474       default:
475         throw new AssertionError();
476     }
477   }
478 
479   @GuardedBy("monitor")
480   private void failed(final State from, final Throwable cause) {
481     // can't memoize this one due to the exception
482     new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") {
483       @Override void call(Listener listener) {
484         listener.failed(from, cause);
485       }
486     }.enqueueOn(listeners);
487   }
488 
489   /**
490    * An immutable snapshot of the current state of the service. This class represents a consistent
491    * snapshot of the state and therefore it can be used to answer simple queries without needing to
492    * grab a lock.
493    */
494   @Immutable
495   private static final class StateSnapshot {
496     /**
497      * The internal state, which equals external state unless
498      * shutdownWhenStartupFinishes is true.
499      */
500     final State state;
501 
502     /**
503      * If true, the user requested a shutdown while the service was still starting
504      * up.
505      */
506     final boolean shutdownWhenStartupFinishes;
507 
508     /**
509      * The exception that caused this service to fail.  This will be {@code null}
510      * unless the service has failed.
511      */
512     @Nullable
513     final Throwable failure;
514 
515     StateSnapshot(State internalState) {
516       this(internalState, false, null);
517     }
518 
519     StateSnapshot(
520         State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
521       checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING,
522           "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
523           internalState);
524       checkArgument(!(failure != null ^ internalState == FAILED),
525           "A failure cause should be set if and only if the state is failed.  Got %s and %s "
526           + "instead.", internalState, failure);
527       this.state = internalState;
528       this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
529       this.failure = failure;
530     }
531 
532     /** @see Service#state() */
533     State externalState() {
534       if (shutdownWhenStartupFinishes && state == STARTING) {
535         return STOPPING;
536       } else {
537         return state;
538       }
539     }
540 
541     /** @see Service#failureCause() */
542     Throwable failureCause() {
543       checkState(state == FAILED,
544           "failureCause() is only valid if the service has failed, service is %s", state);
545       return failure;
546     }
547   }
548 }