View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.jms.listener.adapter;
18  
19  import javax.jms.Destination;
20  import javax.jms.InvalidDestinationException;
21  import javax.jms.JMSException;
22  import javax.jms.Message;
23  import javax.jms.MessageListener;
24  import javax.jms.MessageProducer;
25  import javax.jms.Session;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  
30  import org.springframework.jms.listener.SessionAwareMessageListener;
31  import org.springframework.jms.support.JmsHeaderMapper;
32  import org.springframework.jms.support.JmsUtils;
33  import org.springframework.jms.support.SimpleJmsHeaderMapper;
34  import org.springframework.jms.support.converter.MessageConversionException;
35  import org.springframework.jms.support.converter.MessageConverter;
36  import org.springframework.jms.support.converter.MessagingMessageConverter;
37  import org.springframework.jms.support.converter.SimpleMessageConverter;
38  import org.springframework.jms.support.destination.DestinationResolver;
39  import org.springframework.jms.support.destination.DynamicDestinationResolver;
40  import org.springframework.util.Assert;
41  
42  /**
43   * An abstract {@link MessageListener} adapter providing the necessary infrastructure
44   * to extract the payload of a {@link Message}
45   *
46   * @author Juergen Hoeller
47   * @author Stephane Nicoll
48   * @since 4.1
49   * @see MessageListener
50   * @see SessionAwareMessageListener
51   */
52  public abstract class AbstractAdaptableMessageListener
53  		implements MessageListener, SessionAwareMessageListener<Message> {
54  
55  	/** Logger available to subclasses */
56  	protected final Log logger = LogFactory.getLog(getClass());
57  
58  	private Object defaultResponseDestination;
59  
60  	private DestinationResolver destinationResolver = new DynamicDestinationResolver();
61  
62  	private MessageConverter messageConverter = new SimpleMessageConverter();
63  
64  	private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter();
65  
66  
67  	/**
68  	 * Set the default destination to send response messages to. This will be applied
69  	 * in case of a request message that does not carry a "JMSReplyTo" field.
70  	 * <p>Response destinations are only relevant for listener methods that return
71  	 * result objects, which will be wrapped in a response message and sent to a
72  	 * response destination.
73  	 * <p>Alternatively, specify a "defaultResponseQueueName" or "defaultResponseTopicName",
74  	 * to be dynamically resolved via the DestinationResolver.
75  	 * @see #setDefaultResponseQueueName(String)
76  	 * @see #setDefaultResponseTopicName(String)
77  	 * @see #getResponseDestination
78  	 */
79  	public void setDefaultResponseDestination(Destination destination) {
80  		this.defaultResponseDestination = destination;
81  	}
82  
83  	/**
84  	 * Set the name of the default response queue to send response messages to.
85  	 * This will be applied in case of a request message that does not carry a
86  	 * "JMSReplyTo" field.
87  	 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination".
88  	 * @see #setDestinationResolver
89  	 * @see #setDefaultResponseDestination(javax.jms.Destination)
90  	 */
91  	public void setDefaultResponseQueueName(String destinationName) {
92  		this.defaultResponseDestination = new DestinationNameHolder(destinationName, false);
93  	}
94  
95  	/**
96  	 * Set the name of the default response topic to send response messages to.
97  	 * This will be applied in case of a request message that does not carry a
98  	 * "JMSReplyTo" field.
99  	 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination".
100 	 * @see #setDestinationResolver
101 	 * @see #setDefaultResponseDestination(javax.jms.Destination)
102 	 */
103 	public void setDefaultResponseTopicName(String destinationName) {
104 		this.defaultResponseDestination = new DestinationNameHolder(destinationName, true);
105 	}
106 
107 	/**
108 	 * Set the DestinationResolver that should be used to resolve response
109 	 * destination names for this adapter.
110 	 * <p>The default resolver is a DynamicDestinationResolver. Specify a
111 	 * JndiDestinationResolver for resolving destination names as JNDI locations.
112 	 * @see org.springframework.jms.support.destination.DynamicDestinationResolver
113 	 * @see org.springframework.jms.support.destination.JndiDestinationResolver
114 	 */
115 	public void setDestinationResolver(DestinationResolver destinationResolver) {
116 		Assert.notNull(destinationResolver, "DestinationResolver must not be null");
117 		this.destinationResolver = destinationResolver;
118 	}
119 
120 	/**
121 	 * Return the DestinationResolver for this adapter.
122 	 */
123 	protected DestinationResolver getDestinationResolver() {
124 		return this.destinationResolver;
125 	}
126 
127 	/**
128 	 * Set the converter that will convert incoming JMS messages to
129 	 * listener method arguments, and objects returned from listener
130 	 * methods back to JMS messages.
131 	 * <p>The default converter is a {@link SimpleMessageConverter}, which is able
132 	 * to handle {@link javax.jms.BytesMessage BytesMessages},
133 	 * {@link javax.jms.TextMessage TextMessages} and
134 	 * {@link javax.jms.ObjectMessage ObjectMessages}.
135 	 */
136 	public void setMessageConverter(MessageConverter messageConverter) {
137 		this.messageConverter = messageConverter;
138 	}
139 
140 	/**
141 	 * Return the converter that will convert incoming JMS messages to
142 	 * listener method arguments, and objects returned from listener
143 	 * methods back to JMS messages.
144 	 */
145 	protected MessageConverter getMessageConverter() {
146 		return this.messageConverter;
147 	}
148 
149 	/**
150 	 * Set the {@link JmsHeaderMapper} implementation to use to map the standard
151 	 * JMS headers. By default, a {@link SimpleJmsHeaderMapper} is used.
152 	 * @see SimpleJmsHeaderMapper
153 	 */
154 	public void setHeaderMapper(JmsHeaderMapper headerMapper) {
155 		Assert.notNull(headerMapper, "HeaderMapper must not be null");
156 		this.messagingMessageConverter.setHeaderMapper(headerMapper);
157 	}
158 
159 	/**
160 	 * Return the {@link MessagingMessageConverter} for this listener,
161 	 * being able to convert {@link org.springframework.messaging.Message}.
162 	 */
163 	protected final MessagingMessageConverter getMessagingMessageConverter() {
164 		return this.messagingMessageConverter;
165 	}
166 
167 
168 	/**
169 	 * Standard JMS {@link MessageListener} entry point.
170 	 * <p>Delegates the message to the target listener method, with appropriate
171 	 * conversion of the message argument. In case of an exception, the
172 	 * {@link #handleListenerException(Throwable)} method will be invoked.
173 	 * <p><b>Note:</b> Does not support sending response messages based on
174 	 * result objects returned from listener methods. Use the
175 	 * {@link SessionAwareMessageListener} entry point (typically through a Spring
176 	 * message listener container) for handling result objects as well.
177 	 * @param message the incoming JMS message
178 	 * @see #handleListenerException
179 	 * @see #onMessage(javax.jms.Message, javax.jms.Session)
180 	 */
181 	@Override
182 	public void onMessage(Message message) {
183 		try {
184 			onMessage(message, null);
185 		}
186 		catch (Throwable ex) {
187 			handleListenerException(ex);
188 		}
189 	}
190 
191 	/**
192 	 * Handle the given exception that arose during listener execution.
193 	 * The default implementation logs the exception at error level.
194 	 * <p>This method only applies when used as standard JMS {@link MessageListener}.
195 	 * In case of the Spring {@link SessionAwareMessageListener} mechanism,
196 	 * exceptions get handled by the caller instead.
197 	 * @param ex the exception to handle
198 	 * @see #onMessage(javax.jms.Message)
199 	 */
200 	protected void handleListenerException(Throwable ex) {
201 		logger.error("Listener execution failed", ex);
202 	}
203 
204 	/**
205 	 * Extract the message body from the given JMS message.
206 	 * @param message the JMS {@code Message}
207 	 * @return the content of the message, to be passed into the
208 	 * listener method as argument
209 	 * @throws MessageConversionException if the message could not be unmarshaled
210 	 */
211 	protected Object extractMessage(Message message)  {
212 		try {
213 			MessageConverter converter = getMessageConverter();
214 			if (converter != null) {
215 				return converter.fromMessage(message);
216 			}
217 			return message;
218 		}
219 		catch (JMSException ex) {
220 			throw new MessageConversionException("Could not unmarshal message", ex);
221 		}
222 	}
223 
224 	/**
225 	 * Handle the given result object returned from the listener method,
226 	 * sending a response message back.
227 	 * @param result the result object to handle (never {@code null})
228 	 * @param request the original request message
229 	 * @param session the JMS Session to operate on (may be {@code null})
230 	 * @throws ReplyFailureException if the response message could not be sent
231 	 * @see #buildMessage
232 	 * @see #postProcessResponse
233 	 * @see #getResponseDestination
234 	 * @see #sendResponse
235 	 */
236 	protected void handleResult(Object result, Message request, Session session) {
237 		if (session != null) {
238 			if (logger.isDebugEnabled()) {
239 				logger.debug("Listener method returned result [" + result +
240 						"] - generating response message for it");
241 			}
242 			try {
243 				Message response = buildMessage(session, result);
244 				postProcessResponse(request, response);
245 				Destination destination = getResponseDestination(request, response, session);
246 				sendResponse(session, destination, response);
247 			}
248 			catch (Exception ex) {
249 				throw new ReplyFailureException("Failed to send reply with payload '" + result + "'", ex);
250 			}
251 		}
252 		else {
253 			if (logger.isWarnEnabled()) {
254 				logger.warn("Listener method returned result [" + result +
255 						"]: not generating response message for it because of no JMS Session given");
256 			}
257 		}
258 	}
259 
260 	/**
261 	 * Build a JMS message to be sent as response based on the given result object.
262 	 * @param session the JMS Session to operate on
263 	 * @param result the content of the message, as returned from the listener method
264 	 * @return the JMS {@code Message} (never {@code null})
265 	 * @throws JMSException if thrown by JMS API methods
266 	 * @see #setMessageConverter
267 	 */
268 	protected Message buildMessage(Session session, Object result) throws JMSException {
269 		MessageConverter converter = getMessageConverter();
270 		if (converter != null) {
271 			if (result instanceof org.springframework.messaging.Message) {
272 				return this.messagingMessageConverter.toMessage(result, session);
273 			}
274 			else {
275 				return converter.toMessage(result, session);
276 			}
277 		}
278 		else {
279 			if (!(result instanceof Message)) {
280 				throw new MessageConversionException(
281 						"No MessageConverter specified - cannot handle message [" + result + "]");
282 			}
283 			return (Message) result;
284 		}
285 	}
286 
287 	/**
288 	 * Post-process the given response message before it will be sent.
289 	 * <p>The default implementation sets the response's correlation id
290 	 * to the request message's correlation id, if any; otherwise to the
291 	 * request message id.
292 	 * @param request the original incoming JMS message
293 	 * @param response the outgoing JMS message about to be sent
294 	 * @throws JMSException if thrown by JMS API methods
295 	 * @see javax.jms.Message#setJMSCorrelationID
296 	 */
297 	protected void postProcessResponse(Message request, Message response) throws JMSException {
298 		String correlation = request.getJMSCorrelationID();
299 		if (correlation == null) {
300 			correlation = request.getJMSMessageID();
301 		}
302 		response.setJMSCorrelationID(correlation);
303 	}
304 
305 	/**
306 	 * Determine a response destination for the given message.
307 	 * <p>The default implementation first checks the JMS Reply-To
308 	 * {@link Destination} of the supplied request; if that is not {@code null}
309 	 * it is returned; if it is {@code null}, then the configured
310 	 * {@link #resolveDefaultResponseDestination default response destination}
311 	 * is returned; if this too is {@code null}, then an
312 	 * {@link javax.jms.InvalidDestinationException} is thrown.
313 	 * @param request the original incoming JMS message
314 	 * @param response the outgoing JMS message about to be sent
315 	 * @param session the JMS Session to operate on
316 	 * @return the response destination (never {@code null})
317 	 * @throws JMSException if thrown by JMS API methods
318 	 * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined
319 	 * @see #setDefaultResponseDestination
320 	 * @see javax.jms.Message#getJMSReplyTo()
321 	 */
322 	protected Destination getResponseDestination(Message request, Message response, Session session)
323 			throws JMSException {
324 
325 		Destination replyTo = request.getJMSReplyTo();
326 		if (replyTo == null) {
327 			replyTo = resolveDefaultResponseDestination(session);
328 			if (replyTo == null) {
329 				throw new InvalidDestinationException("Cannot determine response destination: " +
330 						"Request message does not contain reply-to destination, and no default response destination set.");
331 			}
332 		}
333 		return replyTo;
334 	}
335 
336 	/**
337 	 * Resolve the default response destination into a JMS {@link Destination}, using this
338 	 * accessor's {@link DestinationResolver} in case of a destination name.
339 	 * @return the located {@link Destination}
340 	 * @throws javax.jms.JMSException if resolution failed
341 	 * @see #setDefaultResponseDestination
342 	 * @see #setDefaultResponseQueueName
343 	 * @see #setDefaultResponseTopicName
344 	 * @see #setDestinationResolver
345 	 */
346 	protected Destination resolveDefaultResponseDestination(Session session) throws JMSException {
347 		if (this.defaultResponseDestination instanceof Destination) {
348 			return (Destination) this.defaultResponseDestination;
349 		}
350 		if (this.defaultResponseDestination instanceof DestinationNameHolder) {
351 			DestinationNameHolder nameHolder = (DestinationNameHolder) this.defaultResponseDestination;
352 			return getDestinationResolver().resolveDestinationName(session, nameHolder.name, nameHolder.isTopic);
353 		}
354 		return null;
355 	}
356 
357 	/**
358 	 * Send the given response message to the given destination.
359 	 * @param response the JMS message to send
360 	 * @param destination the JMS destination to send to
361 	 * @param session the JMS session to operate on
362 	 * @throws JMSException if thrown by JMS API methods
363 	 * @see #postProcessProducer
364 	 * @see javax.jms.Session#createProducer
365 	 * @see javax.jms.MessageProducer#send
366 	 */
367 	protected void sendResponse(Session session, Destination destination, Message response) throws JMSException {
368 		MessageProducer producer = session.createProducer(destination);
369 		try {
370 			postProcessProducer(producer, response);
371 			producer.send(response);
372 		}
373 		finally {
374 			JmsUtils.closeMessageProducer(producer);
375 		}
376 	}
377 
378 	/**
379 	 * Post-process the given message producer before using it to send the response.
380 	 * <p>The default implementation is empty.
381 	 * @param producer the JMS message producer that will be used to send the message
382 	 * @param response the outgoing JMS message about to be sent
383 	 * @throws JMSException if thrown by JMS API methods
384 	 */
385 	protected void postProcessProducer(MessageProducer producer, Message response) throws JMSException {
386 	}
387 
388 
389 	/**
390 	 * Delegates payload extraction to {@link #extractMessage(javax.jms.Message)} to
391 	 * enforce backward compatibility.
392 	 */
393 	private class MessagingMessageConverterAdapter extends MessagingMessageConverter {
394 
395 		@Override
396 		protected Object extractPayload(Message message) throws JMSException {
397 			return extractMessage(message);
398 		}
399 	}
400 
401 
402 	/**
403 	 * Internal class combining a destination name
404 	 * and its target destination type (queue or topic).
405 	 */
406 	private static class DestinationNameHolder {
407 
408 		public final String name;
409 
410 		public final boolean isTopic;
411 
412 		public DestinationNameHolder(String name, boolean isTopic) {
413 			this.name = name;
414 			this.isTopic = isTopic;
415 		}
416 	}
417 
418 }