1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.messaging.simp.broker;
18
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import org.springframework.context.ApplicationEventPublisher;
27 import org.springframework.context.ApplicationEventPublisherAware;
28 import org.springframework.context.SmartLifecycle;
29 import org.springframework.messaging.Message;
30 import org.springframework.messaging.MessageChannel;
31 import org.springframework.messaging.MessageHandler;
32 import org.springframework.messaging.SubscribableChannel;
33 import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
34 import org.springframework.messaging.simp.SimpMessageType;
35 import org.springframework.messaging.support.ChannelInterceptor;
36 import org.springframework.messaging.support.ChannelInterceptorAdapter;
37 import org.springframework.messaging.support.InterceptableChannel;
38 import org.springframework.util.Assert;
39 import org.springframework.util.CollectionUtils;
40
41
42
43
44
45
46
47
48 public abstract class AbstractBrokerMessageHandler
49 implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle {
50
51 protected final Log logger = LogFactory.getLog(getClass());
52
53 private final SubscribableChannel clientInboundChannel;
54
55 private final MessageChannel clientOutboundChannel;
56
57 private final SubscribableChannel brokerChannel;
58
59 private final Collection<String> destinationPrefixes;
60
61 private ApplicationEventPublisher eventPublisher;
62
63 private AtomicBoolean brokerAvailable = new AtomicBoolean(false);
64
65 private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this);
66
67 private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this);
68
69 private boolean autoStartup = true;
70
71 private volatile boolean running = false;
72
73 private final Object lifecycleMonitor = new Object();
74
75 private final ChannelInterceptor unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor();
76
77
78
79
80
81
82
83
84 public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
85 SubscribableChannel brokerChannel) {
86
87 this(inboundChannel, outboundChannel, brokerChannel, Collections.<String>emptyList());
88 }
89
90
91
92
93
94
95
96
97 public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
98 SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
99
100 Assert.notNull(inboundChannel, "'inboundChannel' must not be null");
101 Assert.notNull(outboundChannel, "'outboundChannel' must not be null");
102 Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
103
104 this.clientInboundChannel = inboundChannel;
105 this.clientOutboundChannel = outboundChannel;
106 this.brokerChannel = brokerChannel;
107
108 destinationPrefixes = (destinationPrefixes != null) ? destinationPrefixes : Collections.<String>emptyList();
109 this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes);
110 }
111
112
113 public SubscribableChannel getClientInboundChannel() {
114 return this.clientInboundChannel;
115 }
116
117 public MessageChannel getClientOutboundChannel() {
118 return this.clientOutboundChannel;
119 }
120
121 public SubscribableChannel getBrokerChannel() {
122 return this.brokerChannel;
123 }
124
125 public Collection<String> getDestinationPrefixes() {
126 return this.destinationPrefixes;
127 }
128
129 @Override
130 public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
131 this.eventPublisher = publisher;
132 }
133
134 public ApplicationEventPublisher getApplicationEventPublisher() {
135 return this.eventPublisher;
136 }
137
138 public void setAutoStartup(boolean autoStartup) {
139 this.autoStartup = autoStartup;
140 }
141
142 @Override
143 public boolean isAutoStartup() {
144 return this.autoStartup;
145 }
146
147 @Override
148 public int getPhase() {
149 return Integer.MAX_VALUE;
150 }
151
152
153 @Override
154 public void start() {
155 synchronized (this.lifecycleMonitor) {
156 if (logger.isInfoEnabled()) {
157 logger.info("Starting...");
158 }
159 this.clientInboundChannel.subscribe(this);
160 this.brokerChannel.subscribe(this);
161 if (this.clientInboundChannel instanceof InterceptableChannel) {
162 ((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor);
163 }
164 startInternal();
165 this.running = true;
166 logger.info("Started.");
167 }
168 }
169
170 protected void startInternal() {
171 }
172
173 @Override
174 public void stop() {
175 synchronized (this.lifecycleMonitor) {
176 if (logger.isInfoEnabled()) {
177 logger.info("Stopping...");
178 }
179 stopInternal();
180 this.clientInboundChannel.unsubscribe(this);
181 this.brokerChannel.unsubscribe(this);
182 if (this.clientInboundChannel instanceof InterceptableChannel) {
183 ((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor);
184 }
185 this.running = false;
186 logger.info("Stopped.");
187 }
188 }
189
190 protected void stopInternal() {
191 }
192
193 @Override
194 public final void stop(Runnable callback) {
195 synchronized (this.lifecycleMonitor) {
196 stop();
197 callback.run();
198 }
199 }
200
201
202
203
204
205
206
207 @Override
208 public final boolean isRunning() {
209 synchronized (this.lifecycleMonitor) {
210 return this.running;
211 }
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225 public boolean isBrokerAvailable() {
226 return this.brokerAvailable.get();
227 }
228
229
230 @Override
231 public void handleMessage(Message<?> message) {
232 if (!this.running) {
233 if (logger.isTraceEnabled()) {
234 logger.trace(this + " not running yet. Ignoring " + message);
235 }
236 return;
237 }
238 handleMessageInternal(message);
239 }
240
241 protected abstract void handleMessageInternal(Message<?> message);
242
243
244 protected boolean checkDestinationPrefix(String destination) {
245 if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) {
246 return true;
247 }
248 for (String prefix : this.destinationPrefixes) {
249 if (destination.startsWith(prefix)) {
250 return true;
251 }
252 }
253 return false;
254 }
255
256 protected void publishBrokerAvailableEvent() {
257 boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
258 if (this.eventPublisher != null && shouldPublish) {
259 if (logger.isInfoEnabled()) {
260 logger.info(this.availableEvent);
261 }
262 this.eventPublisher.publishEvent(this.availableEvent);
263 }
264 }
265
266 protected void publishBrokerUnavailableEvent() {
267 boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
268 if (this.eventPublisher != null && shouldPublish) {
269 if (logger.isInfoEnabled()) {
270 logger.info(this.notAvailableEvent);
271 }
272 this.eventPublisher.publishEvent(this.notAvailableEvent);
273 }
274 }
275
276
277
278
279
280 private class UnsentDisconnectChannelInterceptor extends ChannelInterceptorAdapter {
281
282 @Override
283 public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
284 if (!sent) {
285 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
286 if (SimpMessageType.DISCONNECT.equals(messageType)) {
287 logger.debug("Detected unsent DISCONNECT message. Processing anyway.");
288 handleMessage(message);
289 }
290 }
291 }
292 }
293
294 }