1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.google.common.util.concurrent;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.base.Predicates.equalTo;
22 import static com.google.common.base.Predicates.in;
23 import static com.google.common.base.Predicates.instanceOf;
24 import static com.google.common.base.Predicates.not;
25 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
26 import static com.google.common.util.concurrent.Service.State.FAILED;
27 import static com.google.common.util.concurrent.Service.State.NEW;
28 import static com.google.common.util.concurrent.Service.State.RUNNING;
29 import static com.google.common.util.concurrent.Service.State.STARTING;
30 import static com.google.common.util.concurrent.Service.State.STOPPING;
31 import static com.google.common.util.concurrent.Service.State.TERMINATED;
32 import static java.util.concurrent.TimeUnit.MILLISECONDS;
33
34 import com.google.common.annotations.Beta;
35 import com.google.common.base.Function;
36 import com.google.common.base.MoreObjects;
37 import com.google.common.base.Stopwatch;
38 import com.google.common.base.Supplier;
39 import com.google.common.collect.Collections2;
40 import com.google.common.collect.ImmutableCollection;
41 import com.google.common.collect.ImmutableList;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.ImmutableMultimap;
44 import com.google.common.collect.ImmutableSet;
45 import com.google.common.collect.ImmutableSetMultimap;
46 import com.google.common.collect.Lists;
47 import com.google.common.collect.Maps;
48 import com.google.common.collect.Multimaps;
49 import com.google.common.collect.Multiset;
50 import com.google.common.collect.Ordering;
51 import com.google.common.collect.SetMultimap;
52 import com.google.common.collect.Sets;
53 import com.google.common.util.concurrent.ListenerCallQueue.Callback;
54 import com.google.common.util.concurrent.Service.State;
55
56 import java.lang.ref.WeakReference;
57 import java.util.ArrayList;
58 import java.util.Collection;
59 import java.util.Collections;
60 import java.util.EnumMap;
61 import java.util.List;
62 import java.util.Map;
63 import java.util.Map.Entry;
64 import java.util.Set;
65 import java.util.concurrent.Executor;
66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.TimeoutException;
68 import java.util.logging.Level;
69 import java.util.logging.Logger;
70
71 import javax.annotation.concurrent.GuardedBy;
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 @Beta
126 public final class ServiceManager {
127 private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
128 private static final Callback<Listener> HEALTHY_CALLBACK = new Callback<Listener>("healthy()") {
129 @Override void call(Listener listener) {
130 listener.healthy();
131 }
132 };
133 private static final Callback<Listener> STOPPED_CALLBACK = new Callback<Listener>("stopped()") {
134 @Override void call(Listener listener) {
135 listener.stopped();
136 }
137 };
138
139
140
141
142
143
144
145
146
147
148 @Beta
149 public abstract static class Listener {
150
151
152
153
154
155
156
157
158 public void healthy() {}
159
160
161
162
163
164 public void stopped() {}
165
166
167
168
169
170
171 public void failure(Service service) {}
172 }
173
174
175
176
177
178
179
180
181 private final ServiceManagerState state;
182 private final ImmutableList<Service> services;
183
184
185
186
187
188
189
190
191
192 public ServiceManager(Iterable<? extends Service> services) {
193 ImmutableList<Service> copy = ImmutableList.copyOf(services);
194 if (copy.isEmpty()) {
195
196
197 logger.log(Level.WARNING,
198 "ServiceManager configured with no services. Is your application configured properly?",
199 new EmptyServiceManagerWarning());
200 copy = ImmutableList.<Service>of(new NoOpService());
201 }
202 this.state = new ServiceManagerState(copy);
203 this.services = copy;
204 WeakReference<ServiceManagerState> stateReference =
205 new WeakReference<ServiceManagerState>(state);
206 for (Service service : copy) {
207 service.addListener(new ServiceListener(service, stateReference), directExecutor());
208
209
210 checkArgument(service.state() == NEW, "Can only manage NEW services, %s", service);
211 }
212
213
214 this.state.markReady();
215 }
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 public void addListener(Listener listener, Executor executor) {
241 state.addListener(listener, executor);
242 }
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 public void addListener(Listener listener) {
261 state.addListener(listener, directExecutor());
262 }
263
264
265
266
267
268
269
270
271
272 public ServiceManager startAsync() {
273 for (Service service : services) {
274 State state = service.state();
275 checkState(state == NEW, "Service %s is %s, cannot start it.", service, state);
276 }
277 for (Service service : services) {
278 try {
279 state.tryStartTiming(service);
280 service.startAsync();
281 } catch (IllegalStateException e) {
282
283
284
285
286 logger.log(Level.WARNING, "Unable to start Service " + service, e);
287 }
288 }
289 return this;
290 }
291
292
293
294
295
296
297
298
299
300 public void awaitHealthy() {
301 state.awaitHealthy();
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315 public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
316 state.awaitHealthy(timeout, unit);
317 }
318
319
320
321
322
323
324
325 public ServiceManager stopAsync() {
326 for (Service service : services) {
327 service.stopAsync();
328 }
329 return this;
330 }
331
332
333
334
335
336
337 public void awaitStopped() {
338 state.awaitStopped();
339 }
340
341
342
343
344
345
346
347
348
349
350 public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
351 state.awaitStopped(timeout, unit);
352 }
353
354
355
356
357
358
359
360 public boolean isHealthy() {
361 for (Service service : services) {
362 if (!service.isRunning()) {
363 return false;
364 }
365 }
366 return true;
367 }
368
369
370
371
372
373
374
375 public ImmutableMultimap<State, Service> servicesByState() {
376 return state.servicesByState();
377 }
378
379
380
381
382
383
384
385
386 public ImmutableMap<Service, Long> startupTimes() {
387 return state.startupTimes();
388 }
389
390 @Override public String toString() {
391 return MoreObjects.toStringHelper(ServiceManager.class)
392 .add("services", Collections2.filter(services, not(instanceOf(NoOpService.class))))
393 .toString();
394 }
395
396
397
398
399
400 private static final class ServiceManagerState {
401 final Monitor monitor = new Monitor();
402
403 @GuardedBy("monitor")
404 final SetMultimap<State, Service> servicesByState =
405 Multimaps.newSetMultimap(new EnumMap<State, Collection<Service>>(State.class),
406 new Supplier<Set<Service>>() {
407 @Override public Set<Service> get() {
408 return Sets.newLinkedHashSet();
409 }
410 });
411
412 @GuardedBy("monitor")
413 final Multiset<State> states = servicesByState.keys();
414
415 @GuardedBy("monitor")
416 final Map<Service, Stopwatch> startupTimers = Maps.newIdentityHashMap();
417
418
419
420
421
422
423
424
425
426
427
428
429 @GuardedBy("monitor")
430 boolean ready;
431
432 @GuardedBy("monitor")
433 boolean transitioned;
434
435 final int numberOfServices;
436
437
438
439
440
441 final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
442 @Override public boolean isSatisfied() {
443
444 return states.count(RUNNING) == numberOfServices
445 || states.contains(STOPPING)
446 || states.contains(TERMINATED)
447 || states.contains(FAILED);
448 }
449 };
450
451
452
453
454 final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
455 @Override public boolean isSatisfied() {
456 return states.count(TERMINATED) + states.count(FAILED) == numberOfServices;
457 }
458 };
459
460
461 @GuardedBy("monitor")
462 final List<ListenerCallQueue<Listener>> listeners =
463 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
464
465
466
467
468
469
470
471 ServiceManagerState(ImmutableCollection<Service> services) {
472 this.numberOfServices = services.size();
473 servicesByState.putAll(NEW, services);
474 }
475
476
477
478
479
480 void tryStartTiming(Service service) {
481 monitor.enter();
482 try {
483 Stopwatch stopwatch = startupTimers.get(service);
484 if (stopwatch == null) {
485 startupTimers.put(service, Stopwatch.createStarted());
486 }
487 } finally {
488 monitor.leave();
489 }
490 }
491
492
493
494
495
496 void markReady() {
497 monitor.enter();
498 try {
499 if (!transitioned) {
500
501 ready = true;
502 } else {
503
504 List<Service> servicesInBadStates = Lists.newArrayList();
505 for (Service service : servicesByState().values()) {
506 if (service.state() != NEW) {
507 servicesInBadStates.add(service);
508 }
509 }
510 throw new IllegalArgumentException("Services started transitioning asynchronously before "
511 + "the ServiceManager was constructed: " + servicesInBadStates);
512 }
513 } finally {
514 monitor.leave();
515 }
516 }
517
518 void addListener(Listener listener, Executor executor) {
519 checkNotNull(listener, "listener");
520 checkNotNull(executor, "executor");
521 monitor.enter();
522 try {
523
524 if (!stoppedGuard.isSatisfied()) {
525 listeners.add(new ListenerCallQueue<Listener>(listener, executor));
526 }
527 } finally {
528 monitor.leave();
529 }
530 }
531
532 void awaitHealthy() {
533 monitor.enterWhenUninterruptibly(awaitHealthGuard);
534 try {
535 checkHealthy();
536 } finally {
537 monitor.leave();
538 }
539 }
540
541 void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
542 monitor.enter();
543 try {
544 if (!monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) {
545 throw new TimeoutException("Timeout waiting for the services to become healthy. The "
546 + "following services have not started: "
547 + Multimaps.filterKeys(servicesByState, in(ImmutableSet.of(NEW, STARTING))));
548 }
549 checkHealthy();
550 } finally {
551 monitor.leave();
552 }
553 }
554
555 void awaitStopped() {
556 monitor.enterWhenUninterruptibly(stoppedGuard);
557 monitor.leave();
558 }
559
560 void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
561 monitor.enter();
562 try {
563 if (!monitor.waitForUninterruptibly(stoppedGuard, timeout, unit)) {
564 throw new TimeoutException("Timeout waiting for the services to stop. The following "
565 + "services have not stopped: "
566 + Multimaps.filterKeys(servicesByState,
567 not(in(ImmutableSet.of(TERMINATED, FAILED)))));
568 }
569 } finally {
570 monitor.leave();
571 }
572 }
573
574 ImmutableMultimap<State, Service> servicesByState() {
575 ImmutableSetMultimap.Builder<State, Service> builder = ImmutableSetMultimap.builder();
576 monitor.enter();
577 try {
578 for (Entry<State, Service> entry : servicesByState.entries()) {
579 if (!(entry.getValue() instanceof NoOpService)) {
580 builder.put(entry.getKey(), entry.getValue());
581 }
582 }
583 } finally {
584 monitor.leave();
585 }
586 return builder.build();
587 }
588
589 ImmutableMap<Service, Long> startupTimes() {
590 List<Entry<Service, Long>> loadTimes;
591 monitor.enter();
592 try {
593 loadTimes = Lists.newArrayListWithCapacity(startupTimers.size());
594
595 for (Entry<Service, Stopwatch> entry : startupTimers.entrySet()) {
596 Service service = entry.getKey();
597 Stopwatch stopWatch = entry.getValue();
598 if (!stopWatch.isRunning() && !(service instanceof NoOpService)) {
599 loadTimes.add(Maps.immutableEntry(service, stopWatch.elapsed(MILLISECONDS)));
600 }
601 }
602 } finally {
603 monitor.leave();
604 }
605 Collections.sort(loadTimes, Ordering.<Long>natural()
606 .onResultOf(new Function<Entry<Service, Long>, Long>() {
607 @Override public Long apply(Map.Entry<Service, Long> input) {
608 return input.getValue();
609 }
610 }));
611 ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder();
612 for (Entry<Service, Long> entry : loadTimes) {
613 builder.put(entry);
614 }
615 return builder.build();
616 }
617
618
619
620
621
622
623
624
625
626
627
628
629 void transitionService(final Service service, State from, State to) {
630 checkNotNull(service);
631 checkArgument(from != to);
632 monitor.enter();
633 try {
634 transitioned = true;
635 if (!ready) {
636 return;
637 }
638
639 checkState(servicesByState.remove(from, service),
640 "Service %s not at the expected location in the state map %s", service, from);
641 checkState(servicesByState.put(to, service),
642 "Service %s in the state map unexpectedly at %s", service, to);
643
644 Stopwatch stopwatch = startupTimers.get(service);
645 if (stopwatch == null) {
646
647 stopwatch = Stopwatch.createStarted();
648 startupTimers.put(service, stopwatch);
649 }
650 if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
651
652 stopwatch.stop();
653 if (!(service instanceof NoOpService)) {
654 logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch});
655 }
656 }
657
658
659
660 if (to == FAILED) {
661 fireFailedListeners(service);
662 }
663
664 if (states.count(RUNNING) == numberOfServices) {
665
666
667 fireHealthyListeners();
668 } else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
669 fireStoppedListeners();
670 }
671 } finally {
672 monitor.leave();
673
674 executeListeners();
675 }
676 }
677
678 @GuardedBy("monitor")
679 void fireStoppedListeners() {
680 STOPPED_CALLBACK.enqueueOn(listeners);
681 }
682
683 @GuardedBy("monitor")
684 void fireHealthyListeners() {
685 HEALTHY_CALLBACK.enqueueOn(listeners);
686 }
687
688 @GuardedBy("monitor")
689 void fireFailedListeners(final Service service) {
690 new Callback<Listener>("failed({service=" + service + "})") {
691 @Override void call(Listener listener) {
692 listener.failure(service);
693 }
694 }.enqueueOn(listeners);
695 }
696
697
698 void executeListeners() {
699 checkState(!monitor.isOccupiedByCurrentThread(),
700 "It is incorrect to execute listeners with the monitor held.");
701
702 for (int i = 0; i < listeners.size(); i++) {
703 listeners.get(i).execute();
704 }
705 }
706
707 @GuardedBy("monitor")
708 void checkHealthy() {
709 if (states.count(RUNNING) != numberOfServices) {
710 IllegalStateException exception = new IllegalStateException(
711 "Expected to be healthy after starting. The following services are not running: "
712 + Multimaps.filterKeys(servicesByState, not(equalTo(RUNNING))));
713 throw exception;
714 }
715 }
716 }
717
718
719
720
721
722
723 private static final class ServiceListener extends Service.Listener {
724 final Service service;
725
726
727 final WeakReference<ServiceManagerState> state;
728
729 ServiceListener(Service service, WeakReference<ServiceManagerState> state) {
730 this.service = service;
731 this.state = state;
732 }
733
734 @Override public void starting() {
735 ServiceManagerState state = this.state.get();
736 if (state != null) {
737 state.transitionService(service, NEW, STARTING);
738 if (!(service instanceof NoOpService)) {
739 logger.log(Level.FINE, "Starting {0}.", service);
740 }
741 }
742 }
743
744 @Override public void running() {
745 ServiceManagerState state = this.state.get();
746 if (state != null) {
747 state.transitionService(service, STARTING, RUNNING);
748 }
749 }
750
751 @Override public void stopping(State from) {
752 ServiceManagerState state = this.state.get();
753 if (state != null) {
754 state.transitionService(service, from, STOPPING);
755 }
756 }
757
758 @Override public void terminated(State from) {
759 ServiceManagerState state = this.state.get();
760 if (state != null) {
761 if (!(service instanceof NoOpService)) {
762 logger.log(Level.FINE, "Service {0} has terminated. Previous state was: {1}",
763 new Object[] {service, from});
764 }
765 state.transitionService(service, from, TERMINATED);
766 }
767 }
768
769 @Override public void failed(State from, Throwable failure) {
770 ServiceManagerState state = this.state.get();
771 if (state != null) {
772
773
774 if (!(service instanceof NoOpService)) {
775 logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.",
776 failure);
777 }
778 state.transitionService(service, from, FAILED);
779 }
780 }
781 }
782
783
784
785
786
787
788
789
790
791 private static final class NoOpService extends AbstractService {
792 @Override protected void doStart() { notifyStarted(); }
793 @Override protected void doStop() { notifyStopped(); }
794 }
795
796
797 private static final class EmptyServiceManagerWarning extends Throwable {}
798 }