1 /* 2 * Copyright 2002-2014 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package org.springframework.jms.listener.adapter; 18 19 import javax.jms.Destination; 20 import javax.jms.InvalidDestinationException; 21 import javax.jms.JMSException; 22 import javax.jms.Message; 23 import javax.jms.MessageListener; 24 import javax.jms.MessageProducer; 25 import javax.jms.Session; 26 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 30 import org.springframework.jms.listener.SessionAwareMessageListener; 31 import org.springframework.jms.support.JmsHeaderMapper; 32 import org.springframework.jms.support.JmsUtils; 33 import org.springframework.jms.support.SimpleJmsHeaderMapper; 34 import org.springframework.jms.support.converter.MessageConversionException; 35 import org.springframework.jms.support.converter.MessageConverter; 36 import org.springframework.jms.support.converter.MessagingMessageConverter; 37 import org.springframework.jms.support.converter.SimpleMessageConverter; 38 import org.springframework.jms.support.destination.DestinationResolver; 39 import org.springframework.jms.support.destination.DynamicDestinationResolver; 40 import org.springframework.util.Assert; 41 42 /** 43 * An abstract {@link MessageListener} adapter providing the necessary infrastructure 44 * to extract the payload of a {@link Message} 45 * 46 * @author Juergen Hoeller 47 * @author Stephane Nicoll 48 * @since 4.1 49 * @see MessageListener 50 * @see SessionAwareMessageListener 51 */ 52 public abstract class AbstractAdaptableMessageListener 53 implements MessageListener, SessionAwareMessageListener<Message> { 54 55 /** Logger available to subclasses */ 56 protected final Log logger = LogFactory.getLog(getClass()); 57 58 private Object defaultResponseDestination; 59 60 private DestinationResolver destinationResolver = new DynamicDestinationResolver(); 61 62 private MessageConverter messageConverter = new SimpleMessageConverter(); 63 64 private final MessagingMessageConverterAdapter messagingMessageConverter = new MessagingMessageConverterAdapter(); 65 66 67 /** 68 * Set the default destination to send response messages to. This will be applied 69 * in case of a request message that does not carry a "JMSReplyTo" field. 70 * <p>Response destinations are only relevant for listener methods that return 71 * result objects, which will be wrapped in a response message and sent to a 72 * response destination. 73 * <p>Alternatively, specify a "defaultResponseQueueName" or "defaultResponseTopicName", 74 * to be dynamically resolved via the DestinationResolver. 75 * @see #setDefaultResponseQueueName(String) 76 * @see #setDefaultResponseTopicName(String) 77 * @see #getResponseDestination 78 */ 79 public void setDefaultResponseDestination(Destination destination) { 80 this.defaultResponseDestination = destination; 81 } 82 83 /** 84 * Set the name of the default response queue to send response messages to. 85 * This will be applied in case of a request message that does not carry a 86 * "JMSReplyTo" field. 87 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination". 88 * @see #setDestinationResolver 89 * @see #setDefaultResponseDestination(javax.jms.Destination) 90 */ 91 public void setDefaultResponseQueueName(String destinationName) { 92 this.defaultResponseDestination = new DestinationNameHolder(destinationName, false); 93 } 94 95 /** 96 * Set the name of the default response topic to send response messages to. 97 * This will be applied in case of a request message that does not carry a 98 * "JMSReplyTo" field. 99 * <p>Alternatively, specify a JMS Destination object as "defaultResponseDestination". 100 * @see #setDestinationResolver 101 * @see #setDefaultResponseDestination(javax.jms.Destination) 102 */ 103 public void setDefaultResponseTopicName(String destinationName) { 104 this.defaultResponseDestination = new DestinationNameHolder(destinationName, true); 105 } 106 107 /** 108 * Set the DestinationResolver that should be used to resolve response 109 * destination names for this adapter. 110 * <p>The default resolver is a DynamicDestinationResolver. Specify a 111 * JndiDestinationResolver for resolving destination names as JNDI locations. 112 * @see org.springframework.jms.support.destination.DynamicDestinationResolver 113 * @see org.springframework.jms.support.destination.JndiDestinationResolver 114 */ 115 public void setDestinationResolver(DestinationResolver destinationResolver) { 116 Assert.notNull(destinationResolver, "DestinationResolver must not be null"); 117 this.destinationResolver = destinationResolver; 118 } 119 120 /** 121 * Return the DestinationResolver for this adapter. 122 */ 123 protected DestinationResolver getDestinationResolver() { 124 return this.destinationResolver; 125 } 126 127 /** 128 * Set the converter that will convert incoming JMS messages to 129 * listener method arguments, and objects returned from listener 130 * methods back to JMS messages. 131 * <p>The default converter is a {@link SimpleMessageConverter}, which is able 132 * to handle {@link javax.jms.BytesMessage BytesMessages}, 133 * {@link javax.jms.TextMessage TextMessages} and 134 * {@link javax.jms.ObjectMessage ObjectMessages}. 135 */ 136 public void setMessageConverter(MessageConverter messageConverter) { 137 this.messageConverter = messageConverter; 138 } 139 140 /** 141 * Return the converter that will convert incoming JMS messages to 142 * listener method arguments, and objects returned from listener 143 * methods back to JMS messages. 144 */ 145 protected MessageConverter getMessageConverter() { 146 return this.messageConverter; 147 } 148 149 /** 150 * Set the {@link JmsHeaderMapper} implementation to use to map the standard 151 * JMS headers. By default, a {@link SimpleJmsHeaderMapper} is used. 152 * @see SimpleJmsHeaderMapper 153 */ 154 public void setHeaderMapper(JmsHeaderMapper headerMapper) { 155 Assert.notNull(headerMapper, "HeaderMapper must not be null"); 156 this.messagingMessageConverter.setHeaderMapper(headerMapper); 157 } 158 159 /** 160 * Return the {@link MessagingMessageConverter} for this listener, 161 * being able to convert {@link org.springframework.messaging.Message}. 162 */ 163 protected final MessagingMessageConverter getMessagingMessageConverter() { 164 return this.messagingMessageConverter; 165 } 166 167 168 /** 169 * Standard JMS {@link MessageListener} entry point. 170 * <p>Delegates the message to the target listener method, with appropriate 171 * conversion of the message argument. In case of an exception, the 172 * {@link #handleListenerException(Throwable)} method will be invoked. 173 * <p><b>Note:</b> Does not support sending response messages based on 174 * result objects returned from listener methods. Use the 175 * {@link SessionAwareMessageListener} entry point (typically through a Spring 176 * message listener container) for handling result objects as well. 177 * @param message the incoming JMS message 178 * @see #handleListenerException 179 * @see #onMessage(javax.jms.Message, javax.jms.Session) 180 */ 181 @Override 182 public void onMessage(Message message) { 183 try { 184 onMessage(message, null); 185 } 186 catch (Throwable ex) { 187 handleListenerException(ex); 188 } 189 } 190 191 /** 192 * Handle the given exception that arose during listener execution. 193 * The default implementation logs the exception at error level. 194 * <p>This method only applies when used as standard JMS {@link MessageListener}. 195 * In case of the Spring {@link SessionAwareMessageListener} mechanism, 196 * exceptions get handled by the caller instead. 197 * @param ex the exception to handle 198 * @see #onMessage(javax.jms.Message) 199 */ 200 protected void handleListenerException(Throwable ex) { 201 logger.error("Listener execution failed", ex); 202 } 203 204 /** 205 * Extract the message body from the given JMS message. 206 * @param message the JMS {@code Message} 207 * @return the content of the message, to be passed into the 208 * listener method as argument 209 * @throws MessageConversionException if the message could not be unmarshaled 210 */ 211 protected Object extractMessage(Message message) { 212 try { 213 MessageConverter converter = getMessageConverter(); 214 if (converter != null) { 215 return converter.fromMessage(message); 216 } 217 return message; 218 } 219 catch (JMSException ex) { 220 throw new MessageConversionException("Could not unmarshal message", ex); 221 } 222 } 223 224 /** 225 * Handle the given result object returned from the listener method, 226 * sending a response message back. 227 * @param result the result object to handle (never {@code null}) 228 * @param request the original request message 229 * @param session the JMS Session to operate on (may be {@code null}) 230 * @throws ReplyFailureException if the response message could not be sent 231 * @see #buildMessage 232 * @see #postProcessResponse 233 * @see #getResponseDestination 234 * @see #sendResponse 235 */ 236 protected void handleResult(Object result, Message request, Session session) { 237 if (session != null) { 238 if (logger.isDebugEnabled()) { 239 logger.debug("Listener method returned result [" + result + 240 "] - generating response message for it"); 241 } 242 try { 243 Message response = buildMessage(session, result); 244 postProcessResponse(request, response); 245 Destination destination = getResponseDestination(request, response, session); 246 sendResponse(session, destination, response); 247 } 248 catch (Exception ex) { 249 throw new ReplyFailureException("Failed to send reply with payload '" + result + "'", ex); 250 } 251 } 252 else { 253 if (logger.isWarnEnabled()) { 254 logger.warn("Listener method returned result [" + result + 255 "]: not generating response message for it because of no JMS Session given"); 256 } 257 } 258 } 259 260 /** 261 * Build a JMS message to be sent as response based on the given result object. 262 * @param session the JMS Session to operate on 263 * @param result the content of the message, as returned from the listener method 264 * @return the JMS {@code Message} (never {@code null}) 265 * @throws JMSException if thrown by JMS API methods 266 * @see #setMessageConverter 267 */ 268 protected Message buildMessage(Session session, Object result) throws JMSException { 269 MessageConverter converter = getMessageConverter(); 270 if (converter != null) { 271 if (result instanceof org.springframework.messaging.Message) { 272 return this.messagingMessageConverter.toMessage(result, session); 273 } 274 else { 275 return converter.toMessage(result, session); 276 } 277 } 278 else { 279 if (!(result instanceof Message)) { 280 throw new MessageConversionException( 281 "No MessageConverter specified - cannot handle message [" + result + "]"); 282 } 283 return (Message) result; 284 } 285 } 286 287 /** 288 * Post-process the given response message before it will be sent. 289 * <p>The default implementation sets the response's correlation id 290 * to the request message's correlation id, if any; otherwise to the 291 * request message id. 292 * @param request the original incoming JMS message 293 * @param response the outgoing JMS message about to be sent 294 * @throws JMSException if thrown by JMS API methods 295 * @see javax.jms.Message#setJMSCorrelationID 296 */ 297 protected void postProcessResponse(Message request, Message response) throws JMSException { 298 String correlation = request.getJMSCorrelationID(); 299 if (correlation == null) { 300 correlation = request.getJMSMessageID(); 301 } 302 response.setJMSCorrelationID(correlation); 303 } 304 305 /** 306 * Determine a response destination for the given message. 307 * <p>The default implementation first checks the JMS Reply-To 308 * {@link Destination} of the supplied request; if that is not {@code null} 309 * it is returned; if it is {@code null}, then the configured 310 * {@link #resolveDefaultResponseDestination default response destination} 311 * is returned; if this too is {@code null}, then an 312 * {@link javax.jms.InvalidDestinationException} is thrown. 313 * @param request the original incoming JMS message 314 * @param response the outgoing JMS message about to be sent 315 * @param session the JMS Session to operate on 316 * @return the response destination (never {@code null}) 317 * @throws JMSException if thrown by JMS API methods 318 * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined 319 * @see #setDefaultResponseDestination 320 * @see javax.jms.Message#getJMSReplyTo() 321 */ 322 protected Destination getResponseDestination(Message request, Message response, Session session) 323 throws JMSException { 324 325 Destination replyTo = request.getJMSReplyTo(); 326 if (replyTo == null) { 327 replyTo = resolveDefaultResponseDestination(session); 328 if (replyTo == null) { 329 throw new InvalidDestinationException("Cannot determine response destination: " + 330 "Request message does not contain reply-to destination, and no default response destination set."); 331 } 332 } 333 return replyTo; 334 } 335 336 /** 337 * Resolve the default response destination into a JMS {@link Destination}, using this 338 * accessor's {@link DestinationResolver} in case of a destination name. 339 * @return the located {@link Destination} 340 * @throws javax.jms.JMSException if resolution failed 341 * @see #setDefaultResponseDestination 342 * @see #setDefaultResponseQueueName 343 * @see #setDefaultResponseTopicName 344 * @see #setDestinationResolver 345 */ 346 protected Destination resolveDefaultResponseDestination(Session session) throws JMSException { 347 if (this.defaultResponseDestination instanceof Destination) { 348 return (Destination) this.defaultResponseDestination; 349 } 350 if (this.defaultResponseDestination instanceof DestinationNameHolder) { 351 DestinationNameHolder nameHolder = (DestinationNameHolder) this.defaultResponseDestination; 352 return getDestinationResolver().resolveDestinationName(session, nameHolder.name, nameHolder.isTopic); 353 } 354 return null; 355 } 356 357 /** 358 * Send the given response message to the given destination. 359 * @param response the JMS message to send 360 * @param destination the JMS destination to send to 361 * @param session the JMS session to operate on 362 * @throws JMSException if thrown by JMS API methods 363 * @see #postProcessProducer 364 * @see javax.jms.Session#createProducer 365 * @see javax.jms.MessageProducer#send 366 */ 367 protected void sendResponse(Session session, Destination destination, Message response) throws JMSException { 368 MessageProducer producer = session.createProducer(destination); 369 try { 370 postProcessProducer(producer, response); 371 producer.send(response); 372 } 373 finally { 374 JmsUtils.closeMessageProducer(producer); 375 } 376 } 377 378 /** 379 * Post-process the given message producer before using it to send the response. 380 * <p>The default implementation is empty. 381 * @param producer the JMS message producer that will be used to send the message 382 * @param response the outgoing JMS message about to be sent 383 * @throws JMSException if thrown by JMS API methods 384 */ 385 protected void postProcessProducer(MessageProducer producer, Message response) throws JMSException { 386 } 387 388 389 /** 390 * Delegates payload extraction to {@link #extractMessage(javax.jms.Message)} to 391 * enforce backward compatibility. 392 */ 393 private class MessagingMessageConverterAdapter extends MessagingMessageConverter { 394 395 @Override 396 protected Object extractPayload(Message message) throws JMSException { 397 return extractMessage(message); 398 } 399 } 400 401 402 /** 403 * Internal class combining a destination name 404 * and its target destination type (queue or topic). 405 */ 406 private static class DestinationNameHolder { 407 408 public final String name; 409 410 public final boolean isTopic; 411 412 public DestinationNameHolder(String name, boolean isTopic) { 413 this.name = name; 414 this.isTopic = isTopic; 415 } 416 } 417 418 }