1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
20 import com.google.common.util.concurrent.Service.State;
21
22 import junit.framework.TestCase;
23
24 import java.util.concurrent.CyclicBarrier;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36
37
38
39
40
41
42 public class AbstractScheduledServiceTest extends TestCase {
43
44 volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
45 volatile ScheduledFuture<?> future = null;
46
47 volatile boolean atFixedRateCalled = false;
48 volatile boolean withFixedDelayCalled = false;
49 volatile boolean scheduleCalled = false;
50
51 final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
52 @Override
53 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
54 long delay, TimeUnit unit) {
55 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
56 }
57 };
58
59 public void testServiceStartStop() throws Exception {
60 NullService service = new NullService();
61 service.startAsync().awaitRunning();
62 assertFalse(future.isDone());
63 service.stopAsync().awaitTerminated();
64 assertTrue(future.isCancelled());
65 }
66
67 private class NullService extends AbstractScheduledService {
68 @Override protected void runOneIteration() throws Exception {}
69 @Override protected Scheduler scheduler() { return configuration; }
70 @Override protected ScheduledExecutorService executor() { return executor; }
71 }
72
73 public void testFailOnExceptionFromRun() throws Exception {
74 TestService service = new TestService();
75 service.runException = new Exception();
76 service.startAsync().awaitRunning();
77 service.runFirstBarrier.await();
78 service.runSecondBarrier.await();
79 try {
80 future.get();
81 fail();
82 } catch (ExecutionException e) {
83
84
85 assertEquals(service.runException, e.getCause().getCause());
86 }
87 assertEquals(service.state(), Service.State.FAILED);
88 }
89
90 public void testFailOnExceptionFromStartUp() {
91 TestService service = new TestService();
92 service.startUpException = new Exception();
93 try {
94 service.startAsync().awaitRunning();
95 fail();
96 } catch (IllegalStateException e) {
97 assertEquals(service.startUpException, e.getCause());
98 }
99 assertEquals(0, service.numberOfTimesRunCalled.get());
100 assertEquals(Service.State.FAILED, service.state());
101 }
102
103 public void testFailOnExceptionFromShutDown() throws Exception {
104 TestService service = new TestService();
105 service.shutDownException = new Exception();
106 service.startAsync().awaitRunning();
107 service.runFirstBarrier.await();
108 service.stopAsync();
109 service.runSecondBarrier.await();
110 try {
111 service.awaitTerminated();
112 fail();
113 } catch (IllegalStateException e) {
114 assertEquals(service.shutDownException, e.getCause());
115 }
116 assertEquals(Service.State.FAILED, service.state());
117 }
118
119 public void testRunOneIterationCalledMultipleTimes() throws Exception {
120 TestService service = new TestService();
121 service.startAsync().awaitRunning();
122 for (int i = 1; i < 10; i++) {
123 service.runFirstBarrier.await();
124 assertEquals(i, service.numberOfTimesRunCalled.get());
125 service.runSecondBarrier.await();
126 }
127 service.runFirstBarrier.await();
128 service.stopAsync();
129 service.runSecondBarrier.await();
130 service.stopAsync().awaitTerminated();
131 }
132
133 public void testExecutorOnlyCalledOnce() throws Exception {
134 TestService service = new TestService();
135 service.startAsync().awaitRunning();
136
137 assertEquals(1, service.numberOfTimesExecutorCalled.get());
138 for (int i = 1; i < 10; i++) {
139 service.runFirstBarrier.await();
140 assertEquals(i, service.numberOfTimesRunCalled.get());
141 service.runSecondBarrier.await();
142 }
143 service.runFirstBarrier.await();
144 service.stopAsync();
145 service.runSecondBarrier.await();
146 service.stopAsync().awaitTerminated();
147
148 assertEquals(1, service.numberOfTimesExecutorCalled.get());
149 }
150
151 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
152 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
153 AbstractScheduledService service = new AbstractScheduledService() {
154 @Override protected void runOneIteration() throws Exception {}
155
156 @Override protected ScheduledExecutorService executor() {
157 executor.set(super.executor());
158 return executor.get();
159 }
160
161 @Override protected Scheduler scheduler() {
162 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
163 }
164 };
165
166 service.startAsync();
167 assertFalse(service.executor().isShutdown());
168 service.awaitRunning();
169 service.stopAsync();
170 service.awaitTerminated();
171 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
172 }
173
174 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
175 final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
176 AbstractScheduledService service = new AbstractScheduledService() {
177 @Override protected void startUp() throws Exception {
178 throw new Exception("Failed");
179 }
180
181 @Override protected void runOneIteration() throws Exception {}
182
183 @Override protected ScheduledExecutorService executor() {
184 executor.set(super.executor());
185 return executor.get();
186 }
187
188 @Override protected Scheduler scheduler() {
189 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
190 }
191 };
192
193 try {
194 service.startAsync().awaitRunning();
195 fail("Expected service to fail during startup");
196 } catch (IllegalStateException expected) {}
197
198 assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
199 }
200
201 public void testSchedulerOnlyCalledOnce() throws Exception {
202 TestService service = new TestService();
203 service.startAsync().awaitRunning();
204
205 assertEquals(1, service.numberOfTimesSchedulerCalled.get());
206 for (int i = 1; i < 10; i++) {
207 service.runFirstBarrier.await();
208 assertEquals(i, service.numberOfTimesRunCalled.get());
209 service.runSecondBarrier.await();
210 }
211 service.runFirstBarrier.await();
212 service.stopAsync();
213 service.runSecondBarrier.await();
214 service.awaitTerminated();
215
216 assertEquals(1, service.numberOfTimesSchedulerCalled.get());
217 }
218
219 private class TestService extends AbstractScheduledService {
220 CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
221 CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
222
223 volatile boolean startUpCalled = false;
224 volatile boolean shutDownCalled = false;
225 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
226 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
227 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
228 volatile Exception runException = null;
229 volatile Exception startUpException = null;
230 volatile Exception shutDownException = null;
231
232 @Override
233 protected void runOneIteration() throws Exception {
234 assertTrue(startUpCalled);
235 assertFalse(shutDownCalled);
236 numberOfTimesRunCalled.incrementAndGet();
237 assertEquals(State.RUNNING, state());
238 runFirstBarrier.await();
239 runSecondBarrier.await();
240 if (runException != null) {
241 throw runException;
242 }
243 }
244
245 @Override
246 protected void startUp() throws Exception {
247 assertFalse(startUpCalled);
248 assertFalse(shutDownCalled);
249 startUpCalled = true;
250 assertEquals(State.STARTING, state());
251 if (startUpException != null) {
252 throw startUpException;
253 }
254 }
255
256 @Override
257 protected void shutDown() throws Exception {
258 assertTrue(startUpCalled);
259 assertFalse(shutDownCalled);
260 shutDownCalled = true;
261 if (shutDownException != null) {
262 throw shutDownException;
263 }
264 }
265
266 @Override
267 protected ScheduledExecutorService executor() {
268 numberOfTimesExecutorCalled.incrementAndGet();
269 return executor;
270 }
271
272 @Override
273 protected Scheduler scheduler() {
274 numberOfTimesSchedulerCalled.incrementAndGet();
275 return configuration;
276 }
277 }
278
279 public static class SchedulerTest extends TestCase {
280
281
282 private static final int initialDelay = 10;
283 private static final int delay = 20;
284 private static final TimeUnit unit = TimeUnit.MILLISECONDS;
285
286
287 final Runnable testRunnable = new Runnable() {@Override public void run() {}};
288 boolean called = false;
289
290 private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
291 long delay, TimeUnit unit) {
292 assertFalse(called);
293 called = true;
294 assertEquals(SchedulerTest.initialDelay, initialDelay);
295 assertEquals(SchedulerTest.delay, delay);
296 assertEquals(SchedulerTest.unit, unit);
297 assertEquals(testRunnable, command);
298 }
299
300 public void testFixedRateSchedule() {
301 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
302 schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
303 @Override
304 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
305 long period, TimeUnit unit) {
306 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
307 return null;
308 }
309 }, testRunnable);
310 assertTrue(called);
311 }
312
313 public void testFixedDelaySchedule() {
314 Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
315 schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
316 @Override
317 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
318 long delay, TimeUnit unit) {
319 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
320 return null;
321 }
322 }, testRunnable);
323 assertTrue(called);
324 }
325
326 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
327 public AtomicInteger scheduleCounter = new AtomicInteger(0);
328 @Override
329 protected Schedule getNextSchedule() throws Exception {
330 scheduleCounter.incrementAndGet();
331 return new Schedule(0, TimeUnit.SECONDS);
332 }
333 }
334
335 public void testCustomSchedule_startStop() throws Exception {
336 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
337 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
338 final AtomicBoolean shouldWait = new AtomicBoolean(true);
339 Runnable task = new Runnable() {
340 @Override public void run() {
341 try {
342 if (shouldWait.get()) {
343 firstBarrier.await();
344 secondBarrier.await();
345 }
346 } catch (Exception e) {
347 throw new RuntimeException(e);
348 }
349 }
350 };
351 TestCustomScheduler scheduler = new TestCustomScheduler();
352 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
353 firstBarrier.await();
354 assertEquals(1, scheduler.scheduleCounter.get());
355 secondBarrier.await();
356 firstBarrier.await();
357 assertEquals(2, scheduler.scheduleCounter.get());
358 shouldWait.set(false);
359 secondBarrier.await();
360 future.cancel(false);
361 }
362
363 public void testCustomSchedulerServiceStop() throws Exception {
364 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
365 service.startAsync().awaitRunning();
366 service.firstBarrier.await();
367 assertEquals(1, service.numIterations.get());
368 service.stopAsync();
369 service.secondBarrier.await();
370 service.awaitTerminated();
371
372 Thread.sleep(unit.toMillis(3 * delay));
373 assertEquals(1, service.numIterations.get());
374 }
375
376 public void testBig() throws Exception {
377 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
378 @Override protected Scheduler scheduler() {
379 return new AbstractScheduledService.CustomScheduler() {
380 @Override
381 protected Schedule getNextSchedule() throws Exception {
382
383 Thread.yield();
384 return new Schedule(0, TimeUnit.SECONDS);
385 }
386 };
387 }
388 };
389 service.useBarriers = false;
390 service.startAsync().awaitRunning();
391 Thread.sleep(50);
392 service.useBarriers = true;
393 service.firstBarrier.await();
394 int numIterations = service.numIterations.get();
395 service.stopAsync();
396 service.secondBarrier.await();
397 service.awaitTerminated();
398 assertEquals(numIterations, service.numIterations.get());
399 }
400
401 private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
402 final AtomicInteger numIterations = new AtomicInteger(0);
403 volatile boolean useBarriers = true;
404 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
405 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
406
407 @Override protected void runOneIteration() throws Exception {
408 numIterations.incrementAndGet();
409 if (useBarriers) {
410 firstBarrier.await();
411 secondBarrier.await();
412 }
413 }
414
415 @Override protected ScheduledExecutorService executor() {
416
417 return Executors.newScheduledThreadPool(10);
418 }
419
420 @Override protected void startUp() throws Exception {}
421
422 @Override protected void shutDown() throws Exception {}
423
424 @Override protected Scheduler scheduler() {
425 return new CustomScheduler() {
426 @Override
427 protected Schedule getNextSchedule() throws Exception {
428 return new Schedule(delay, unit);
429 }};
430 }
431 }
432
433 public void testCustomSchedulerFailure() throws Exception {
434 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
435 service.startAsync().awaitRunning();
436 for (int i = 1; i < 4; i++) {
437 service.firstBarrier.await();
438 assertEquals(i, service.numIterations.get());
439 service.secondBarrier.await();
440 }
441 Thread.sleep(1000);
442 try {
443 service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
444 fail();
445 } catch (IllegalStateException e) {
446 assertEquals(State.FAILED, service.state());
447 }
448 }
449
450 private static class TestFailingCustomScheduledService extends AbstractScheduledService {
451 final AtomicInteger numIterations = new AtomicInteger(0);
452 final CyclicBarrier firstBarrier = new CyclicBarrier(2);
453 final CyclicBarrier secondBarrier = new CyclicBarrier(2);
454
455 @Override protected void runOneIteration() throws Exception {
456 numIterations.incrementAndGet();
457 firstBarrier.await();
458 secondBarrier.await();
459 }
460
461 @Override protected ScheduledExecutorService executor() {
462
463 return Executors.newScheduledThreadPool(10);
464 }
465
466 @Override protected Scheduler scheduler() {
467 return new CustomScheduler() {
468 @Override
469 protected Schedule getNextSchedule() throws Exception {
470 if (numIterations.get() > 2) {
471 throw new IllegalStateException("Failed");
472 }
473 return new Schedule(delay, unit);
474 }};
475 }
476 }
477 }
478 }