1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static com.google.common.util.concurrent.Service.State.FAILED;
23 import static com.google.common.util.concurrent.Service.State.NEW;
24 import static com.google.common.util.concurrent.Service.State.RUNNING;
25 import static com.google.common.util.concurrent.Service.State.STARTING;
26 import static com.google.common.util.concurrent.Service.State.STOPPING;
27 import static com.google.common.util.concurrent.Service.State.TERMINATED;
28
29 import com.google.common.annotations.Beta;
30 import com.google.common.util.concurrent.ListenerCallQueue.Callback;
31 import com.google.common.util.concurrent.Monitor.Guard;
32 import com.google.common.util.concurrent.Service.State;
33
34 import java.util.ArrayList;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40
41 import javax.annotation.Nullable;
42 import javax.annotation.concurrent.GuardedBy;
43 import javax.annotation.concurrent.Immutable;
44
45
46
47
48
49
50
51
52
53
54
55 @Beta
56 public abstract class AbstractService implements Service {
57 private static final Callback<Listener> STARTING_CALLBACK =
58 new Callback<Listener>("starting()") {
59 @Override void call(Listener listener) {
60 listener.starting();
61 }
62 };
63 private static final Callback<Listener> RUNNING_CALLBACK =
64 new Callback<Listener>("running()") {
65 @Override void call(Listener listener) {
66 listener.running();
67 }
68 };
69 private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK =
70 stoppingCallback(STARTING);
71 private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK =
72 stoppingCallback(RUNNING);
73
74 private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK =
75 terminatedCallback(NEW);
76 private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK =
77 terminatedCallback(RUNNING);
78 private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK =
79 terminatedCallback(STOPPING);
80
81 private static Callback<Listener> terminatedCallback(final State from) {
82 return new Callback<Listener>("terminated({from = " + from + "})") {
83 @Override void call(Listener listener) {
84 listener.terminated(from);
85 }
86 };
87 }
88
89 private static Callback<Listener> stoppingCallback(final State from) {
90 return new Callback<Listener>("stopping({from = " + from + "})") {
91 @Override void call(Listener listener) {
92 listener.stopping(from);
93 }
94 };
95 }
96
97 private final Monitor monitor = new Monitor();
98
99 private final Guard isStartable = new Guard(monitor) {
100 @Override public boolean isSatisfied() {
101 return state() == NEW;
102 }
103 };
104
105 private final Guard isStoppable = new Guard(monitor) {
106 @Override public boolean isSatisfied() {
107 return state().compareTo(RUNNING) <= 0;
108 }
109 };
110
111 private final Guard hasReachedRunning = new Guard(monitor) {
112 @Override public boolean isSatisfied() {
113 return state().compareTo(RUNNING) >= 0;
114 }
115 };
116
117 private final Guard isStopped = new Guard(monitor) {
118 @Override public boolean isSatisfied() {
119 return state().isTerminal();
120 }
121 };
122
123
124
125
126 @GuardedBy("monitor")
127 private final List<ListenerCallQueue<Listener>> listeners =
128 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
129
130
131
132
133
134
135
136
137
138
139 @GuardedBy("monitor")
140 private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
141
142
143 protected AbstractService() {}
144
145
146
147
148
149
150
151
152
153
154
155 protected abstract void doStart();
156
157
158
159
160
161
162
163
164
165
166
167 protected abstract void doStop();
168
169 @Override public final Service startAsync() {
170 if (monitor.enterIf(isStartable)) {
171 try {
172 snapshot = new StateSnapshot(STARTING);
173 starting();
174 doStart();
175
176 } catch (Throwable startupFailure) {
177 notifyFailed(startupFailure);
178 } finally {
179 monitor.leave();
180 executeListeners();
181 }
182 } else {
183 throw new IllegalStateException("Service " + this + " has already been started");
184 }
185 return this;
186 }
187
188 @Override public final Service stopAsync() {
189 if (monitor.enterIf(isStoppable)) {
190 try {
191 State previous = state();
192 switch (previous) {
193 case NEW:
194 snapshot = new StateSnapshot(TERMINATED);
195 terminated(NEW);
196 break;
197 case STARTING:
198 snapshot = new StateSnapshot(STARTING, true, null);
199 stopping(STARTING);
200 break;
201 case RUNNING:
202 snapshot = new StateSnapshot(STOPPING);
203 stopping(RUNNING);
204 doStop();
205 break;
206 case STOPPING:
207 case TERMINATED:
208 case FAILED:
209
210 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
211 default:
212 throw new AssertionError("Unexpected state: " + previous);
213 }
214
215
216 } catch (Throwable shutdownFailure) {
217 notifyFailed(shutdownFailure);
218 } finally {
219 monitor.leave();
220 executeListeners();
221 }
222 }
223 return this;
224 }
225
226 @Override public final void awaitRunning() {
227 monitor.enterWhenUninterruptibly(hasReachedRunning);
228 try {
229 checkCurrentState(RUNNING);
230 } finally {
231 monitor.leave();
232 }
233 }
234
235 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
236 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
237 try {
238 checkCurrentState(RUNNING);
239 } finally {
240 monitor.leave();
241 }
242 } else {
243
244
245
246
247 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. "
248 + "Current state: " + state());
249 }
250 }
251
252 @Override public final void awaitTerminated() {
253 monitor.enterWhenUninterruptibly(isStopped);
254 try {
255 checkCurrentState(TERMINATED);
256 } finally {
257 monitor.leave();
258 }
259 }
260
261 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
262 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
263 try {
264 checkCurrentState(TERMINATED);
265 } finally {
266 monitor.leave();
267 }
268 } else {
269
270
271
272
273 throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. "
274 + "Current state: " + state());
275 }
276 }
277
278
279 @GuardedBy("monitor")
280 private void checkCurrentState(State expected) {
281 State actual = state();
282 if (actual != expected) {
283 if (actual == FAILED) {
284
285 throw new IllegalStateException("Expected the service to be " + expected
286 + ", but the service has FAILED", failureCause());
287 }
288 throw new IllegalStateException("Expected the service to be " + expected + ", but was "
289 + actual);
290 }
291 }
292
293
294
295
296
297
298
299 protected final void notifyStarted() {
300 monitor.enter();
301 try {
302
303
304 if (snapshot.state != STARTING) {
305 IllegalStateException failure = new IllegalStateException(
306 "Cannot notifyStarted() when the service is " + snapshot.state);
307 notifyFailed(failure);
308 throw failure;
309 }
310
311 if (snapshot.shutdownWhenStartupFinishes) {
312 snapshot = new StateSnapshot(STOPPING);
313
314
315 doStop();
316 } else {
317 snapshot = new StateSnapshot(RUNNING);
318 running();
319 }
320 } finally {
321 monitor.leave();
322 executeListeners();
323 }
324 }
325
326
327
328
329
330
331
332
333 protected final void notifyStopped() {
334 monitor.enter();
335 try {
336
337
338 State previous = snapshot.state;
339 if (previous != STOPPING && previous != RUNNING) {
340 IllegalStateException failure = new IllegalStateException(
341 "Cannot notifyStopped() when the service is " + previous);
342 notifyFailed(failure);
343 throw failure;
344 }
345 snapshot = new StateSnapshot(TERMINATED);
346 terminated(previous);
347 } finally {
348 monitor.leave();
349 executeListeners();
350 }
351 }
352
353
354
355
356
357
358 protected final void notifyFailed(Throwable cause) {
359 checkNotNull(cause);
360
361 monitor.enter();
362 try {
363 State previous = state();
364 switch (previous) {
365 case NEW:
366 case TERMINATED:
367 throw new IllegalStateException("Failed while in state:" + previous, cause);
368 case RUNNING:
369 case STARTING:
370 case STOPPING:
371 snapshot = new StateSnapshot(FAILED, false, cause);
372 failed(previous, cause);
373 break;
374 case FAILED:
375
376 break;
377 default:
378 throw new AssertionError("Unexpected state: " + previous);
379 }
380 } finally {
381 monitor.leave();
382 executeListeners();
383 }
384 }
385
386 @Override
387 public final boolean isRunning() {
388 return state() == RUNNING;
389 }
390
391 @Override
392 public final State state() {
393 return snapshot.externalState();
394 }
395
396
397
398
399 @Override
400 public final Throwable failureCause() {
401 return snapshot.failureCause();
402 }
403
404
405
406
407 @Override
408 public final void addListener(Listener listener, Executor executor) {
409 checkNotNull(listener, "listener");
410 checkNotNull(executor, "executor");
411 monitor.enter();
412 try {
413 if (!state().isTerminal()) {
414 listeners.add(new ListenerCallQueue<Listener>(listener, executor));
415 }
416 } finally {
417 monitor.leave();
418 }
419 }
420
421 @Override public String toString() {
422 return getClass().getSimpleName() + " [" + state() + "]";
423 }
424
425
426
427
428
429 private void executeListeners() {
430 if (!monitor.isOccupiedByCurrentThread()) {
431
432 for (int i = 0; i < listeners.size(); i++) {
433 listeners.get(i).execute();
434 }
435 }
436 }
437
438 @GuardedBy("monitor")
439 private void starting() {
440 STARTING_CALLBACK.enqueueOn(listeners);
441 }
442
443 @GuardedBy("monitor")
444 private void running() {
445 RUNNING_CALLBACK.enqueueOn(listeners);
446 }
447
448 @GuardedBy("monitor")
449 private void stopping(final State from) {
450 if (from == State.STARTING) {
451 STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners);
452 } else if (from == State.RUNNING) {
453 STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
454 } else {
455 throw new AssertionError();
456 }
457 }
458
459 @GuardedBy("monitor")
460 private void terminated(final State from) {
461 switch(from) {
462 case NEW:
463 TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners);
464 break;
465 case RUNNING:
466 TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
467 break;
468 case STOPPING:
469 TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners);
470 break;
471 case STARTING:
472 case TERMINATED:
473 case FAILED:
474 default:
475 throw new AssertionError();
476 }
477 }
478
479 @GuardedBy("monitor")
480 private void failed(final State from, final Throwable cause) {
481
482 new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") {
483 @Override void call(Listener listener) {
484 listener.failed(from, cause);
485 }
486 }.enqueueOn(listeners);
487 }
488
489
490
491
492
493
494 @Immutable
495 private static final class StateSnapshot {
496
497
498
499
500 final State state;
501
502
503
504
505
506 final boolean shutdownWhenStartupFinishes;
507
508
509
510
511
512 @Nullable
513 final Throwable failure;
514
515 StateSnapshot(State internalState) {
516 this(internalState, false, null);
517 }
518
519 StateSnapshot(
520 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
521 checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING,
522 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
523 internalState);
524 checkArgument(!(failure != null ^ internalState == FAILED),
525 "A failure cause should be set if and only if the state is failed. Got %s and %s "
526 + "instead.", internalState, failure);
527 this.state = internalState;
528 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
529 this.failure = failure;
530 }
531
532
533 State externalState() {
534 if (shutdownWhenStartupFinishes && state == STARTING) {
535 return STOPPING;
536 } else {
537 return state;
538 }
539 }
540
541
542 Throwable failureCause() {
543 checkState(state == FAILED,
544 "failureCause() is only valid if the service has failed, service is %s", state);
545 return failure;
546 }
547 }
548 }