View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.messaging.simp.broker;
18  
19  import java.util.Collection;
20  import java.util.Collections;
21  import java.util.concurrent.atomic.AtomicBoolean;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  
26  import org.springframework.context.ApplicationEventPublisher;
27  import org.springframework.context.ApplicationEventPublisherAware;
28  import org.springframework.context.SmartLifecycle;
29  import org.springframework.messaging.Message;
30  import org.springframework.messaging.MessageChannel;
31  import org.springframework.messaging.MessageHandler;
32  import org.springframework.messaging.SubscribableChannel;
33  import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
34  import org.springframework.messaging.simp.SimpMessageType;
35  import org.springframework.messaging.support.ChannelInterceptor;
36  import org.springframework.messaging.support.ChannelInterceptorAdapter;
37  import org.springframework.messaging.support.InterceptableChannel;
38  import org.springframework.util.Assert;
39  import org.springframework.util.CollectionUtils;
40  
41  /**
42   * Abstract base class for a {@link MessageHandler} that broker messages to
43   * registered subscribers.
44   *
45   * @author Rossen Stoyanchev
46   * @since 4.0
47   */
48  public abstract class AbstractBrokerMessageHandler
49  		implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle {
50  
51  	protected final Log logger = LogFactory.getLog(getClass());
52  
53  	private final SubscribableChannel clientInboundChannel;
54  
55  	private final MessageChannel clientOutboundChannel;
56  
57  	private final SubscribableChannel brokerChannel;
58  
59  	private final Collection<String> destinationPrefixes;
60  
61  	private ApplicationEventPublisher eventPublisher;
62  
63  	private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
64  
65  	private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this);
66  
67  	private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this);
68  
69  	private boolean autoStartup = true;
70  
71  	private volatile boolean running = false;
72  
73  	private final Object lifecycleMonitor = new Object();
74  
75  	private final ChannelInterceptor unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor();
76  
77  
78  	/**
79  	 * Constructor with no destination prefixes (matches all destinations).
80  	 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
81  	 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
82  	 * @param brokerChannel the channel for the application to send messages to the broker
83  	 */
84  	public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
85  			SubscribableChannel brokerChannel) {
86  
87  		this(inboundChannel, outboundChannel, brokerChannel, Collections.<String>emptyList());
88  	}
89  
90  	/**
91  	 * Constructor with destination prefixes to match to destinations of messages.
92  	 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
93  	 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
94  	 * @param brokerChannel the channel for the application to send messages to the broker
95  	 * @param destinationPrefixes prefixes to use to filter out messages
96  	 */
97  	public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
98  			SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
99  
100 		Assert.notNull(inboundChannel, "'inboundChannel' must not be null");
101 		Assert.notNull(outboundChannel, "'outboundChannel' must not be null");
102 		Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
103 
104 		this.clientInboundChannel = inboundChannel;
105 		this.clientOutboundChannel = outboundChannel;
106 		this.brokerChannel = brokerChannel;
107 
108 		destinationPrefixes = (destinationPrefixes != null) ? destinationPrefixes : Collections.<String>emptyList();
109 		this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes);
110 	}
111 
112 
113 	public SubscribableChannel getClientInboundChannel() {
114 		return this.clientInboundChannel;
115 	}
116 
117 	public MessageChannel getClientOutboundChannel() {
118 		return this.clientOutboundChannel;
119 	}
120 
121 	public SubscribableChannel getBrokerChannel() {
122 		return this.brokerChannel;
123 	}
124 
125 	public Collection<String> getDestinationPrefixes() {
126 		return this.destinationPrefixes;
127 	}
128 
129 	@Override
130 	public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
131 		this.eventPublisher = publisher;
132 	}
133 
134 	public ApplicationEventPublisher getApplicationEventPublisher() {
135 		return this.eventPublisher;
136 	}
137 
138 	public void setAutoStartup(boolean autoStartup) {
139 		this.autoStartup = autoStartup;
140 	}
141 
142 	@Override
143 	public boolean isAutoStartup() {
144 		return this.autoStartup;
145 	}
146 
147 	@Override
148 	public int getPhase() {
149 		return Integer.MAX_VALUE;
150 	}
151 
152 
153 	@Override
154 	public void start() {
155 		synchronized (this.lifecycleMonitor) {
156 			if (logger.isInfoEnabled()) {
157 				logger.info("Starting...");
158 			}
159 			this.clientInboundChannel.subscribe(this);
160 			this.brokerChannel.subscribe(this);
161 			if (this.clientInboundChannel instanceof InterceptableChannel) {
162 				((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor);
163 			}
164 			startInternal();
165 			this.running = true;
166 			logger.info("Started.");
167 		}
168 	}
169 
170 	protected void startInternal() {
171 	}
172 
173 	@Override
174 	public void stop() {
175 		synchronized (this.lifecycleMonitor) {
176 			if (logger.isInfoEnabled()) {
177 				logger.info("Stopping...");
178 			}
179 			stopInternal();
180 			this.clientInboundChannel.unsubscribe(this);
181 			this.brokerChannel.unsubscribe(this);
182 			if (this.clientInboundChannel instanceof InterceptableChannel) {
183 				((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor);
184 			}
185 			this.running = false;
186 			logger.info("Stopped.");
187 		}
188 	}
189 
190 	protected void stopInternal() {
191 	}
192 
193 	@Override
194 	public final void stop(Runnable callback) {
195 		synchronized (this.lifecycleMonitor) {
196 			stop();
197 			callback.run();
198 		}
199 	}
200 
201 	/**
202 	 * Check whether this message handler is currently running.
203 	 * <p>Note that even when this message handler is running the
204 	 * {@link #isBrokerAvailable()} flag may still independently alternate between
205 	 * being on and off depending on the concrete sub-class implementation.
206 	 */
207 	@Override
208 	public final boolean isRunning() {
209 		synchronized (this.lifecycleMonitor) {
210 			return this.running;
211 		}
212 	}
213 
214 	/**
215 	 * Whether the message broker is currently available and able to process messages.
216 	 * <p>Note that this is in addition to the {@link #isRunning()} flag, which
217 	 * indicates whether this message handler is running. In other words the message
218 	 * handler must first be running and then the {@code #isBrokerAvailable()} flag
219 	 * may still independently alternate between being on and off depending on the
220 	 * concrete sub-class implementation.
221 	 * <p>Application components may implement
222 	 * {@code org.springframework.context.ApplicationListener&lt;BrokerAvailabilityEvent&gt;}
223 	 * to receive notifications when broker becomes available and unavailable.
224 	 */
225 	public boolean isBrokerAvailable() {
226 		return this.brokerAvailable.get();
227 	}
228 
229 
230 	@Override
231 	public void handleMessage(Message<?> message) {
232 		if (!this.running) {
233 			if (logger.isTraceEnabled()) {
234 				logger.trace(this + " not running yet. Ignoring " + message);
235 			}
236 			return;
237 		}
238 		handleMessageInternal(message);
239 	}
240 
241 	protected abstract void handleMessageInternal(Message<?> message);
242 
243 
244 	protected boolean checkDestinationPrefix(String destination) {
245 		if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
246 			return true;
247 		}
248 		for (String prefix : this.destinationPrefixes) {
249 			if (destination.startsWith(prefix)) {
250 				return true;
251 			}
252 		}
253 		return false;
254 	}
255 
256 	protected void publishBrokerAvailableEvent() {
257 		boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
258 		if (this.eventPublisher != null && shouldPublish) {
259 			if (logger.isInfoEnabled()) {
260 				logger.info(this.availableEvent);
261 			}
262 			this.eventPublisher.publishEvent(this.availableEvent);
263 		}
264 	}
265 
266 	protected void publishBrokerUnavailableEvent() {
267 		boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
268 		if (this.eventPublisher != null && shouldPublish) {
269 			if (logger.isInfoEnabled()) {
270 				logger.info(this.notAvailableEvent);
271 			}
272 			this.eventPublisher.publishEvent(this.notAvailableEvent);
273 		}
274 	}
275 
276 
277 	/**
278 	 * Detect unsent DISCONNECT messages and process them anyway.
279 	 */
280 	private class UnsentDisconnectChannelInterceptor extends ChannelInterceptorAdapter {
281 
282 		@Override
283 		public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
284 			if (!sent) {
285 				SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
286 				if (SimpMessageType.DISCONNECT.equals(messageType)) {
287 					logger.debug("Detected unsent DISCONNECT message. Processing anyway.");
288 					handleMessage(message);
289 				}
290 			}
291 		}
292 	}
293 
294 }