1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.scheduling.concurrent;
18
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.RejectedExecutionHandler;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.ThreadPoolExecutor;
30 import java.util.concurrent.TimeUnit;
31
32 import org.springframework.core.task.AsyncListenableTaskExecutor;
33 import org.springframework.core.task.TaskRejectedException;
34 import org.springframework.scheduling.SchedulingTaskExecutor;
35 import org.springframework.util.Assert;
36 import org.springframework.util.concurrent.ListenableFuture;
37 import org.springframework.util.concurrent.ListenableFutureTask;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 @SuppressWarnings("serial")
70 public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
71 implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
72
73 private final Object poolSizeMonitor = new Object();
74
75 private int corePoolSize = 1;
76
77 private int maxPoolSize = Integer.MAX_VALUE;
78
79 private int keepAliveSeconds = 60;
80
81 private int queueCapacity = Integer.MAX_VALUE;
82
83 private boolean allowCoreThreadTimeOut = false;
84
85 private ThreadPoolExecutor threadPoolExecutor;
86
87
88
89
90
91
92
93 public void setCorePoolSize(int corePoolSize) {
94 synchronized (this.poolSizeMonitor) {
95 this.corePoolSize = corePoolSize;
96 if (this.threadPoolExecutor != null) {
97 this.threadPoolExecutor.setCorePoolSize(corePoolSize);
98 }
99 }
100 }
101
102
103
104
105 public int getCorePoolSize() {
106 synchronized (this.poolSizeMonitor) {
107 return this.corePoolSize;
108 }
109 }
110
111
112
113
114
115
116 public void setMaxPoolSize(int maxPoolSize) {
117 synchronized (this.poolSizeMonitor) {
118 this.maxPoolSize = maxPoolSize;
119 if (this.threadPoolExecutor != null) {
120 this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
121 }
122 }
123 }
124
125
126
127
128 public int getMaxPoolSize() {
129 synchronized (this.poolSizeMonitor) {
130 return this.maxPoolSize;
131 }
132 }
133
134
135
136
137
138
139 public void setKeepAliveSeconds(int keepAliveSeconds) {
140 synchronized (this.poolSizeMonitor) {
141 this.keepAliveSeconds = keepAliveSeconds;
142 if (this.threadPoolExecutor != null) {
143 this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
144 }
145 }
146 }
147
148
149
150
151 public int getKeepAliveSeconds() {
152 synchronized (this.poolSizeMonitor) {
153 return this.keepAliveSeconds;
154 }
155 }
156
157
158
159
160
161
162
163
164
165 public void setQueueCapacity(int queueCapacity) {
166 this.queueCapacity = queueCapacity;
167 }
168
169
170
171
172
173
174
175
176 public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
177 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
178 }
179
180
181 @Override
182 protected ExecutorService initializeExecutor(
183 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
184
185 BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
186 ThreadPoolExecutor executor = new ThreadPoolExecutor(
187 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
188 queue, threadFactory, rejectedExecutionHandler);
189 if (this.allowCoreThreadTimeOut) {
190 executor.allowCoreThreadTimeOut(true);
191 }
192
193 this.threadPoolExecutor = executor;
194 return executor;
195 }
196
197
198
199
200
201
202
203
204
205
206 protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
207 if (queueCapacity > 0) {
208 return new LinkedBlockingQueue<Runnable>(queueCapacity);
209 }
210 else {
211 return new SynchronousQueue<Runnable>();
212 }
213 }
214
215
216
217
218
219
220 public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
221 Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
222 return this.threadPoolExecutor;
223 }
224
225
226
227
228
229 public int getPoolSize() {
230 if (this.threadPoolExecutor == null) {
231
232 return this.corePoolSize;
233 }
234 return this.threadPoolExecutor.getPoolSize();
235 }
236
237
238
239
240
241 public int getActiveCount() {
242 if (this.threadPoolExecutor == null) {
243
244 return 0;
245 }
246 return this.threadPoolExecutor.getActiveCount();
247 }
248
249
250 @Override
251 public void execute(Runnable task) {
252 Executor executor = getThreadPoolExecutor();
253 try {
254 executor.execute(task);
255 }
256 catch (RejectedExecutionException ex) {
257 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
258 }
259 }
260
261 @Override
262 public void execute(Runnable task, long startTimeout) {
263 execute(task);
264 }
265
266 @Override
267 public Future<?> submit(Runnable task) {
268 ExecutorService executor = getThreadPoolExecutor();
269 try {
270 return executor.submit(task);
271 }
272 catch (RejectedExecutionException ex) {
273 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
274 }
275 }
276
277 @Override
278 public <T> Future<T> submit(Callable<T> task) {
279 ExecutorService executor = getThreadPoolExecutor();
280 try {
281 return executor.submit(task);
282 }
283 catch (RejectedExecutionException ex) {
284 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
285 }
286 }
287
288 @Override
289 public ListenableFuture<?> submitListenable(Runnable task) {
290 ExecutorService executor = getThreadPoolExecutor();
291 try {
292 ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
293 executor.execute(future);
294 return future;
295 }
296 catch (RejectedExecutionException ex) {
297 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
298 }
299 }
300
301 @Override
302 public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
303 ExecutorService executor = getThreadPoolExecutor();
304 try {
305 ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
306 executor.execute(future);
307 return future;
308 }
309 catch (RejectedExecutionException ex) {
310 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
311 }
312 }
313
314
315
316
317 @Override
318 public boolean prefersShortLivedTasks() {
319 return true;
320 }
321
322 }