View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.scheduling.concurrent;
18  
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.Executor;
22  import java.util.concurrent.ExecutorService;
23  import java.util.concurrent.Future;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.RejectedExecutionHandler;
27  import java.util.concurrent.SynchronousQueue;
28  import java.util.concurrent.ThreadFactory;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.springframework.core.task.AsyncListenableTaskExecutor;
33  import org.springframework.core.task.TaskRejectedException;
34  import org.springframework.scheduling.SchedulingTaskExecutor;
35  import org.springframework.util.Assert;
36  import org.springframework.util.concurrent.ListenableFuture;
37  import org.springframework.util.concurrent.ListenableFutureTask;
38  
39  /**
40   * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
41   * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
42   * properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
43   * This class is also well suited for management and monitoring (e.g. through JMX),
44   * providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds"
45   * (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only).
46   *
47   * <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using
48   * constructor injection, or use a factory method definition that points to the
49   * {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a
50   * Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a
51   * {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter.
52   *
53   * <p><b>NOTE:</b> This class implements Spring's
54   * {@link org.springframework.core.task.TaskExecutor} interface as well as the
55   * {@link java.util.concurrent.Executor} interface, with the former being the primary
56   * interface, the other just serving as secondary convenience. For this reason, the
57   * exception handling follows the TaskExecutor contract rather than the Executor contract,
58   * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
59   *
60   * <p><b>If you prefer native {@link java.util.concurrent.ExecutorService} exposure instead,
61   * consider {@link ThreadPoolExecutorFactoryBean} as an alternative to this class.</b>
62   *
63   * @author Juergen Hoeller
64   * @since 2.0
65   * @see org.springframework.core.task.TaskExecutor
66   * @see java.util.concurrent.ThreadPoolExecutor
67   * @see ConcurrentTaskExecutor
68   */
69  @SuppressWarnings("serial")
70  public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
71  		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
72  
73  	private final Object poolSizeMonitor = new Object();
74  
75  	private int corePoolSize = 1;
76  
77  	private int maxPoolSize = Integer.MAX_VALUE;
78  
79  	private int keepAliveSeconds = 60;
80  
81  	private int queueCapacity = Integer.MAX_VALUE;
82  
83  	private boolean allowCoreThreadTimeOut = false;
84  
85  	private ThreadPoolExecutor threadPoolExecutor;
86  
87  
88  	/**
89  	 * Set the ThreadPoolExecutor's core pool size.
90  	 * Default is 1.
91  	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
92  	 */
93  	public void setCorePoolSize(int corePoolSize) {
94  		synchronized (this.poolSizeMonitor) {
95  			this.corePoolSize = corePoolSize;
96  			if (this.threadPoolExecutor != null) {
97  				this.threadPoolExecutor.setCorePoolSize(corePoolSize);
98  			}
99  		}
100 	}
101 
102 	/**
103 	 * Return the ThreadPoolExecutor's core pool size.
104 	 */
105 	public int getCorePoolSize() {
106 		synchronized (this.poolSizeMonitor) {
107 			return this.corePoolSize;
108 		}
109 	}
110 
111 	/**
112 	 * Set the ThreadPoolExecutor's maximum pool size.
113 	 * Default is {@code Integer.MAX_VALUE}.
114 	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
115 	 */
116 	public void setMaxPoolSize(int maxPoolSize) {
117 		synchronized (this.poolSizeMonitor) {
118 			this.maxPoolSize = maxPoolSize;
119 			if (this.threadPoolExecutor != null) {
120 				this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
121 			}
122 		}
123 	}
124 
125 	/**
126 	 * Return the ThreadPoolExecutor's maximum pool size.
127 	 */
128 	public int getMaxPoolSize() {
129 		synchronized (this.poolSizeMonitor) {
130 			return this.maxPoolSize;
131 		}
132 	}
133 
134 	/**
135 	 * Set the ThreadPoolExecutor's keep-alive seconds.
136 	 * Default is 60.
137 	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
138 	 */
139 	public void setKeepAliveSeconds(int keepAliveSeconds) {
140 		synchronized (this.poolSizeMonitor) {
141 			this.keepAliveSeconds = keepAliveSeconds;
142 			if (this.threadPoolExecutor != null) {
143 				this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
144 			}
145 		}
146 	}
147 
148 	/**
149 	 * Return the ThreadPoolExecutor's keep-alive seconds.
150 	 */
151 	public int getKeepAliveSeconds() {
152 		synchronized (this.poolSizeMonitor) {
153 			return this.keepAliveSeconds;
154 		}
155 	}
156 
157 	/**
158 	 * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
159 	 * Default is {@code Integer.MAX_VALUE}.
160 	 * <p>Any positive value will lead to a LinkedBlockingQueue instance;
161 	 * any other value will lead to a SynchronousQueue instance.
162 	 * @see java.util.concurrent.LinkedBlockingQueue
163 	 * @see java.util.concurrent.SynchronousQueue
164 	 */
165 	public void setQueueCapacity(int queueCapacity) {
166 		this.queueCapacity = queueCapacity;
167 	}
168 
169 	/**
170 	 * Specify whether to allow core threads to time out. This enables dynamic
171 	 * growing and shrinking even in combination with a non-zero queue (since
172 	 * the max pool size will only grow once the queue is full).
173 	 * <p>Default is "false".
174 	 * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
175 	 */
176 	public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
177 		this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
178 	}
179 
180 
181 	@Override
182 	protected ExecutorService initializeExecutor(
183 			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
184 
185 		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
186 		ThreadPoolExecutor executor  = new ThreadPoolExecutor(
187 				this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
188 				queue, threadFactory, rejectedExecutionHandler);
189 		if (this.allowCoreThreadTimeOut) {
190 			executor.allowCoreThreadTimeOut(true);
191 		}
192 
193 		this.threadPoolExecutor = executor;
194 		return executor;
195 	}
196 
197 	/**
198 	 * Create the BlockingQueue to use for the ThreadPoolExecutor.
199 	 * <p>A LinkedBlockingQueue instance will be created for a positive
200 	 * capacity value; a SynchronousQueue else.
201 	 * @param queueCapacity the specified queue capacity
202 	 * @return the BlockingQueue instance
203 	 * @see java.util.concurrent.LinkedBlockingQueue
204 	 * @see java.util.concurrent.SynchronousQueue
205 	 */
206 	protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
207 		if (queueCapacity > 0) {
208 			return new LinkedBlockingQueue<Runnable>(queueCapacity);
209 		}
210 		else {
211 			return new SynchronousQueue<Runnable>();
212 		}
213 	}
214 
215 	/**
216 	 * Return the underlying ThreadPoolExecutor for native access.
217 	 * @return the underlying ThreadPoolExecutor (never {@code null})
218 	 * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
219 	 */
220 	public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
221 		Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
222 		return this.threadPoolExecutor;
223 	}
224 
225 	/**
226 	 * Return the current pool size.
227 	 * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
228 	 */
229 	public int getPoolSize() {
230 		if (this.threadPoolExecutor == null) {
231 			// Not initialized yet: assume core pool size.
232 			return this.corePoolSize;
233 		}
234 		return this.threadPoolExecutor.getPoolSize();
235 	}
236 
237 	/**
238 	 * Return the number of currently active threads.
239 	 * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
240 	 */
241 	public int getActiveCount() {
242 		if (this.threadPoolExecutor == null) {
243 			// Not initialized yet: assume no active threads.
244 			return 0;
245 		}
246 		return this.threadPoolExecutor.getActiveCount();
247 	}
248 
249 
250 	@Override
251 	public void execute(Runnable task) {
252 		Executor executor = getThreadPoolExecutor();
253 		try {
254 			executor.execute(task);
255 		}
256 		catch (RejectedExecutionException ex) {
257 			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
258 		}
259 	}
260 
261 	@Override
262 	public void execute(Runnable task, long startTimeout) {
263 		execute(task);
264 	}
265 
266 	@Override
267 	public Future<?> submit(Runnable task) {
268 		ExecutorService executor = getThreadPoolExecutor();
269 		try {
270 			return executor.submit(task);
271 		}
272 		catch (RejectedExecutionException ex) {
273 			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
274 		}
275 	}
276 
277 	@Override
278 	public <T> Future<T> submit(Callable<T> task) {
279 		ExecutorService executor = getThreadPoolExecutor();
280 		try {
281 			return executor.submit(task);
282 		}
283 		catch (RejectedExecutionException ex) {
284 			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
285 		}
286 	}
287 
288 	@Override
289 	public ListenableFuture<?> submitListenable(Runnable task) {
290 		ExecutorService executor = getThreadPoolExecutor();
291 		try {
292 			ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
293 			executor.execute(future);
294 			return future;
295 		}
296 		catch (RejectedExecutionException ex) {
297 			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
298 		}
299 	}
300 
301 	@Override
302 	public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
303 		ExecutorService executor = getThreadPoolExecutor();
304 		try {
305 			ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
306 			executor.execute(future);
307 			return future;
308 		}
309 		catch (RejectedExecutionException ex) {
310 			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
311 		}
312 	}
313 
314 	/**
315 	 * This task executor prefers short-lived work units.
316 	 */
317 	@Override
318 	public boolean prefersShortLivedTasks() {
319 		return true;
320 	}
321 
322 }