1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Lists;
21 import com.google.common.collect.Queues;
22
23 import junit.framework.TestCase;
24
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.CyclicBarrier;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36
37
38
39
40
41 public class SerializingExecutorTest extends TestCase {
42 private static class FakeExecutor implements Executor {
43 Queue<Runnable> tasks = Queues.newArrayDeque();
44 @Override public void execute(Runnable command) {
45 tasks.add(command);
46 }
47
48 boolean hasNext() {
49 return !tasks.isEmpty();
50 }
51
52 void runNext() {
53 assertTrue("expected at least one task to run", hasNext());
54 tasks.remove().run();
55 }
56
57 }
58 private FakeExecutor fakePool;
59 private SerializingExecutor e;
60
61 @Override
62 public void setUp() {
63 fakePool = new FakeExecutor();
64 e = new SerializingExecutor(fakePool);
65 }
66
67 public void testSerializingNullExecutor_fails() {
68 try {
69 new SerializingExecutor(null);
70 fail("Should have failed with NullPointerException.");
71 } catch (NullPointerException expected) {
72 }
73 }
74
75 public void testBasics() {
76 final AtomicInteger totalCalls = new AtomicInteger();
77 Runnable intCounter = new Runnable() {
78 @Override
79 public void run() {
80 totalCalls.incrementAndGet();
81 }
82 };
83
84 assertFalse(fakePool.hasNext());
85 e.execute(intCounter);
86 assertTrue(fakePool.hasNext());
87 e.execute(intCounter);
88 assertEquals(0, totalCalls.get());
89 fakePool.runNext();
90 assertEquals(2, totalCalls.get());
91 assertFalse(fakePool.hasNext());
92
93
94 e.execute(intCounter);
95 e.execute(intCounter);
96 e.execute(intCounter);
97 assertEquals(2, totalCalls.get());
98 fakePool.runNext();
99 assertEquals(5, totalCalls.get());
100 assertFalse(fakePool.hasNext());
101 }
102
103 public void testOrdering() {
104 final List<Integer> callOrder = Lists.newArrayList();
105
106 class FakeOp implements Runnable {
107 final int op;
108
109 FakeOp(int op) {
110 this.op = op;
111 }
112
113 @Override
114 public void run() {
115 callOrder.add(op);
116 }
117 }
118
119 e.execute(new FakeOp(0));
120 e.execute(new FakeOp(1));
121 e.execute(new FakeOp(2));
122 fakePool.runNext();
123
124 assertEquals(ImmutableList.of(0, 1, 2), callOrder);
125 }
126
127 public void testExceptions() {
128
129 final AtomicInteger numCalls = new AtomicInteger();
130
131 Runnable runMe = new Runnable() {
132 @Override
133 public void run() {
134 numCalls.incrementAndGet();
135 throw new RuntimeException("FAKE EXCEPTION!");
136 }
137 };
138
139 e.execute(runMe);
140 e.execute(runMe);
141 fakePool.runNext();
142
143 assertEquals(2, numCalls.get());
144 }
145
146 public void testDelegateRejection() {
147 final AtomicInteger numCalls = new AtomicInteger();
148 final AtomicBoolean reject = new AtomicBoolean(true);
149 final SerializingExecutor executor = new SerializingExecutor(
150 new Executor() {
151 @Override public void execute(Runnable r) {
152 if (reject.get()) {
153 throw new RejectedExecutionException();
154 }
155 r.run();
156 }
157 });
158 Runnable task = new Runnable() {
159 @Override
160 public void run() {
161 numCalls.incrementAndGet();
162 }
163 };
164 try {
165 executor.execute(task);
166 fail();
167 } catch (RejectedExecutionException expected) {}
168 assertEquals(0, numCalls.get());
169 reject.set(false);
170 executor.execute(task);
171 assertEquals(2, numCalls.get());
172 }
173
174 public void testTaskThrowsError() throws Exception {
175 class MyError extends Error {}
176 final CyclicBarrier barrier = new CyclicBarrier(2);
177
178 ExecutorService service = Executors.newSingleThreadExecutor();
179 try {
180 final SerializingExecutor executor = new SerializingExecutor(service);
181 Runnable errorTask = new Runnable() {
182 @Override
183 public void run() {
184 throw new MyError();
185 }
186 };
187 Runnable barrierTask = new Runnable() {
188 @Override
189 public void run() {
190 try {
191 barrier.await();
192 } catch (Exception e) {
193 throw new RuntimeException(e);
194 }
195 }
196 };
197 executor.execute(errorTask);
198 service.execute(barrierTask);
199
200
201 barrier.await(10, TimeUnit.SECONDS);
202 executor.execute(barrierTask);
203
204 barrier.await(10, TimeUnit.SECONDS);
205 } finally {
206 service.shutdown();
207 }
208 }
209 }