View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.messaging.simp.stomp;
18  
19  import java.util.Collection;
20  import java.util.Map;
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.springframework.messaging.Message;
27  import org.springframework.messaging.MessageChannel;
28  import org.springframework.messaging.MessageDeliveryException;
29  import org.springframework.messaging.SubscribableChannel;
30  import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
31  import org.springframework.messaging.simp.SimpMessageType;
32  import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
33  import org.springframework.messaging.support.MessageBuilder;
34  import org.springframework.messaging.support.MessageHeaderAccessor;
35  import org.springframework.messaging.support.MessageHeaderInitializer;
36  import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
37  import org.springframework.messaging.tcp.TcpConnection;
38  import org.springframework.messaging.tcp.TcpConnectionHandler;
39  import org.springframework.messaging.tcp.TcpOperations;
40  import org.springframework.messaging.tcp.reactor.Reactor11TcpClient;
41  import org.springframework.util.Assert;
42  import org.springframework.util.concurrent.ListenableFuture;
43  import org.springframework.util.concurrent.ListenableFutureCallback;
44  import org.springframework.util.concurrent.ListenableFutureTask;
45  
46  /**
47   * A {@link org.springframework.messaging.MessageHandler} that handles messages by
48   * forwarding them to a STOMP broker.
49   *
50   * <p>For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
51   * connection to the broker is opened and used exclusively for all messages from the
52   * client that originated the CONNECT message. Messages from the same client are
53   * identified through the session id message header. Reversely, when the STOMP broker
54   * sends messages back on the TCP connection, those messages are enriched with the session
55   * id of the client and sent back downstream through the {@link MessageChannel} provided
56   * to the constructor.
57   *
58   * <p>This class also automatically opens a default "system" TCP connection to the message
59   * broker that is used for sending messages that originate from the server application (as
60   * opposed to from a client). Such messages are are not associated with any client and
61   * therefore do not have a session id header. The "system" connection is effectively
62   * shared and cannot be used to receive messages. Several properties are provided to
63   * configure the "system" connection including:
64   * <ul>
65   * <li>{@link #setSystemLogin(String)}</li>
66   * <li>{@link #setSystemPasscode(String)}</li>
67   * <li>{@link #setSystemHeartbeatSendInterval(long)}</li>
68   * <li>{@link #setSystemHeartbeatReceiveInterval(long)}</li>
69   * </ul>
70   *
71   * @author Rossen Stoyanchev
72   * @author Andy Wilkinson
73   * @since 4.0
74   */
75  public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
76  
77  	public static final String SYSTEM_SESSION_ID = "_system_";
78  
79  	private static final byte[] EMPTY_PAYLOAD = new byte[0];
80  
81  	private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable());
82  
83  	// STOMP recommends error of margin for receiving heartbeats
84  	private static final long HEARTBEAT_MULTIPLIER = 3;
85  
86  	private static final Message<byte[]> HEARTBEAT_MESSAGE;
87  
88  
89  	static {
90  		EMPTY_TASK.run();
91  		StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
92  		HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
93  	}
94  
95  
96  	private String relayHost = "127.0.0.1";
97  
98  	private int relayPort = 61613;
99  
100 	private String clientLogin = "guest";
101 
102 	private String clientPasscode = "guest";
103 
104 	private String systemLogin = "guest";
105 
106 	private String systemPasscode = "guest";
107 
108 	private long systemHeartbeatSendInterval = 10000;
109 
110 	private long systemHeartbeatReceiveInterval = 10000;
111 
112 	private String virtualHost;
113 
114 	private TcpOperations<byte[]> tcpClient;
115 
116 	private MessageHeaderInitializer headerInitializer;
117 
118 	private final Map<String, StompConnectionHandler> connectionHandlers =
119 			new ConcurrentHashMap<String, StompConnectionHandler>();
120 
121 	private final Stats stats = new Stats();
122 
123 
124 	/**
125 	 * Create a StompBrokerRelayMessageHandler instance with the given message channels
126 	 * and destination prefixes.
127 	 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
128 	 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients)
129 	 * @param brokerChannel the channel for the application to send messages to the broker
130 	 * @param destinationPrefixes the broker supported destination prefixes; destinations
131 	 * that do not match the given prefix are ignored.
132 	 */
133 	public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
134 			SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
135 
136 		super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes);
137 	}
138 
139 
140 	/**
141 	 * Set the STOMP message broker host.
142 	 */
143 	public void setRelayHost(String relayHost) {
144 		Assert.hasText(relayHost, "relayHost must not be empty");
145 		this.relayHost = relayHost;
146 	}
147 
148 	/**
149 	 * Return the STOMP message broker host.
150 	 */
151 	public String getRelayHost() {
152 		return this.relayHost;
153 	}
154 
155 	/**
156 	 * Set the STOMP message broker port.
157 	 */
158 	public void setRelayPort(int relayPort) {
159 		this.relayPort = relayPort;
160 	}
161 
162 	/**
163 	 * Return the STOMP message broker port.
164 	 */
165 	public int getRelayPort() {
166 		return this.relayPort;
167 	}
168 
169 	/**
170 	 * Set the interval, in milliseconds, at which the "system" connection will, in the
171 	 * absence of any other data being sent, send a heartbeat to the STOMP broker. A value
172 	 * of zero will prevent heartbeats from being sent to the broker.
173 	 * <p>The default value is 10000.
174 	 * <p>See class-level documentation for more information on the "system" connection.
175 	 */
176 	public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
177 		this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
178 	}
179 
180 	/**
181 	 * Return the interval, in milliseconds, at which the "system" connection will
182 	 * send heartbeats to the STOMP broker.
183 	 */
184 	public long getSystemHeartbeatSendInterval() {
185 		return this.systemHeartbeatSendInterval;
186 	}
187 
188 	/**
189 	 * Set the maximum interval, in milliseconds, at which the "system" connection
190 	 * expects, in the absence of any other data, to receive a heartbeat from the STOMP
191 	 * broker. A value of zero will configure the connection to expect not to receive
192 	 * heartbeats from the broker.
193 	 * <p>The default value is 10000.
194 	 * <p>See class-level documentation for more information on the "system" connection.
195 	 */
196 	public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
197 		this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
198 	}
199 
200 	/**
201 	 * Return the interval, in milliseconds, at which the "system" connection expects
202 	 * to receive heartbeats from the STOMP broker.
203 	 */
204 	public long getSystemHeartbeatReceiveInterval() {
205 		return this.systemHeartbeatReceiveInterval;
206 	}
207 
208 	/**
209 	 * Set the login to use when creating connections to the STOMP broker on
210 	 * behalf of connected clients.
211 	 * <p>By default this is set to "guest".
212 	 * @see #setSystemLogin(String)
213 	 */
214 	public void setClientLogin(String clientLogin) {
215 		Assert.hasText(clientLogin, "clientLogin must not be empty");
216 		this.clientLogin = clientLogin;
217 	}
218 
219 	/**
220 	 * Return the configured login to use for connections to the STOMP broker
221 	 * on behalf of connected clients.
222 	 * @see #getSystemLogin()
223 	 */
224 	public String getClientLogin() {
225 		return this.clientLogin;
226 	}
227 
228 	/**
229 	 * Set the client passcode to use to create connections to the STOMP broker on
230 	 * behalf of connected clients.
231 	 * <p>By default this is set to "guest".
232 	 * @see #setSystemPasscode
233 	 */
234 	public void setClientPasscode(String clientPasscode) {
235 		Assert.hasText(clientPasscode, "clientPasscode must not be empty");
236 		this.clientPasscode = clientPasscode;
237 	}
238 
239 	/**
240 	 * Return the configured passcode to use for connections to the STOMP broker on
241 	 * behalf of connected clients.
242 	 * @see #getSystemPasscode()
243 	 */
244 	public String getClientPasscode() {
245 		return this.clientPasscode;
246 	}
247 
248 	/**
249 	 * Set the login for the shared "system" connection used to send messages to
250 	 * the STOMP broker from within the application, i.e. messages not associated
251 	 * with a specific client session (e.g. REST/HTTP request handling method).
252 	 * <p>By default this is set to "guest".
253 	 */
254 	public void setSystemLogin(String systemLogin) {
255 		Assert.hasText(systemLogin, "systemLogin must not be empty");
256 		this.systemLogin = systemLogin;
257 	}
258 
259 	/**
260 	 * Return the login used for the shared "system" connection to the STOMP broker.
261 	 */
262 	public String getSystemLogin() {
263 		return this.systemLogin;
264 	}
265 
266 	/**
267 	 * Set the passcode for the shared "system" connection used to send messages to
268 	 * the STOMP broker from within the application, i.e. messages not associated
269 	 * with a specific client session (e.g. REST/HTTP request handling method).
270 	 * <p>By default this is set to "guest".
271 	 */
272 	public void setSystemPasscode(String systemPasscode) {
273 		this.systemPasscode = systemPasscode;
274 	}
275 
276 	/**
277 	 * Return the passcode used for the shared "system" connection to the STOMP broker.
278 	 */
279 	public String getSystemPasscode() {
280 		return this.systemPasscode;
281 	}
282 
283 	/**
284 	 * Set the value of the "host" header to use in STOMP CONNECT frames. When this
285 	 * property is configured, a "host" header will be added to every STOMP frame sent to
286 	 * the STOMP broker. This may be useful for example in a cloud environment where the
287 	 * actual host to which the TCP connection is established is different from the host
288 	 * providing the cloud-based STOMP service.
289 	 * <p>By default this property is not set.
290 	 */
291 	public void setVirtualHost(String virtualHost) {
292 		this.virtualHost = virtualHost;
293 	}
294 
295 	/**
296 	 * Return the configured virtual host value.
297 	 */
298 	public String getVirtualHost() {
299 		return this.virtualHost;
300 	}
301 
302 	/**
303 	 * Configure a TCP client for managing TCP connections to the STOMP broker.
304 	 * By default {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient} is used.
305 	 */
306 	public void setTcpClient(TcpOperations<byte[]> tcpClient) {
307 		this.tcpClient = tcpClient;
308 	}
309 
310 	/**
311 	 * Get the configured TCP client. Never {@code null} unless not configured
312 	 * invoked and this method is invoked before the handler is started and
313 	 * hence a default implementation initialized.
314 	 */
315 	public TcpOperations<byte[]> getTcpClient() {
316 		return this.tcpClient;
317 	}
318 
319 	/**
320 	 * Return the current count of TCP connection to the broker.
321 	 */
322 	public int getConnectionCount() {
323 		return this.connectionHandlers.size();
324 	}
325 
326 	/**
327 	 * Configure a {@link MessageHeaderInitializer} to apply to the headers of all
328 	 * messages created through the {@code StompBrokerRelayMessageHandler} that
329 	 * are sent to the client outbound message channel.
330 	 * <p>By default this property is not set.
331 	 */
332 	public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
333 		this.headerInitializer = headerInitializer;
334 	}
335 
336 	/**
337 	 * Return the configured header initializer.
338 	 */
339 	public MessageHeaderInitializer getHeaderInitializer() {
340 		return this.headerInitializer;
341 	}
342 
343 	/**
344 	 * Return a String describing internal state and counters.
345 	 */
346 	public String getStatsInfo() {
347 		return this.stats.toString();
348 	}
349 
350 
351 	@Override
352 	protected void startInternal() {
353 		if (this.tcpClient == null) {
354 			StompDecoder decoder = new StompDecoder();
355 			decoder.setHeaderInitializer(getHeaderInitializer());
356 			Reactor11StompCodec codec = new Reactor11StompCodec(new StompEncoder(), decoder);
357 			this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
358 		}
359 
360 		if (logger.isInfoEnabled()) {
361 			logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
362 		}
363 
364 		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
365 		accessor.setAcceptVersion("1.1,1.2");
366 		accessor.setLogin(this.systemLogin);
367 		accessor.setPasscode(this.systemPasscode);
368 		accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
369 		accessor.setHost(getVirtualHost());
370 		accessor.setSessionId(SYSTEM_SESSION_ID);
371 		if (logger.isDebugEnabled()) {
372 			logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
373 		}
374 
375 		SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
376 		this.connectionHandlers.put(handler.getSessionId(), handler);
377 
378 		this.stats.incrementConnectCount();
379 		this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
380 	}
381 
382 	@Override
383 	protected void stopInternal() {
384 		publishBrokerUnavailableEvent();
385 		try {
386 			this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
387 		}
388 		catch (Throwable ex) {
389 			logger.error("Error in shutdown of TCP client", ex);
390 		}
391 	}
392 
393 	@Override
394 	protected void handleMessageInternal(Message<?> message) {
395 		String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
396 
397 		if (!isBrokerAvailable()) {
398 			if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) {
399 				throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
400 						"receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
401 			}
402 			SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
403 			if (logger.isErrorEnabled() && SimpMessageType.CONNECT.equals(messageType)) {
404 				logger.error("Broker not active. Ignoring " + message);
405 			}
406 			else if (logger.isDebugEnabled()) {
407 				logger.debug("Broker not active. Ignoring " + message);
408 			}
409 			return;
410 		}
411 
412 		StompHeaderAccessor stompAccessor;
413 		StompCommand command;
414 
415 		MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
416 		if (accessor == null) {
417 			throw new IllegalStateException(
418 					"No header accessor (not using the SimpMessagingTemplate?): " + message);
419 		}
420 		else if (accessor instanceof StompHeaderAccessor) {
421 			stompAccessor = (StompHeaderAccessor) accessor;
422 			command = stompAccessor.getCommand();
423 		}
424 		else if (accessor instanceof SimpMessageHeaderAccessor) {
425 			stompAccessor = StompHeaderAccessor.wrap(message);
426 			command = stompAccessor.getCommand();
427 			if (command == null) {
428 				command = stompAccessor.updateStompCommandAsClientMessage();
429 			}
430 		}
431 		else {
432 			throw new IllegalStateException(
433 					"Unexpected header accessor type " + accessor.getClass() + " in " + message);
434 		}
435 
436 		if (sessionId == null) {
437 			if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) {
438 				if (logger.isErrorEnabled()) {
439 					logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
440 				}
441 				return;
442 			}
443 			sessionId = SYSTEM_SESSION_ID;
444 			stompAccessor.setSessionId(sessionId);
445 		}
446 
447 		String destination = stompAccessor.getDestination();
448 		if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
449 			return;
450 		}
451 
452 		if (StompCommand.CONNECT.equals(command)) {
453 			if (logger.isDebugEnabled()) {
454 				logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
455 			}
456 			stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
457 			stompAccessor.setLogin(this.clientLogin);
458 			stompAccessor.setPasscode(this.clientPasscode);
459 			if (getVirtualHost() != null) {
460 				stompAccessor.setHost(getVirtualHost());
461 			}
462 			StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
463 			this.connectionHandlers.put(sessionId, handler);
464 			this.stats.incrementConnectCount();
465 			this.tcpClient.connect(handler);
466 		}
467 		else if (StompCommand.DISCONNECT.equals(command)) {
468 			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
469 			if (handler == null) {
470 				if (logger.isDebugEnabled()) {
471 					logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
472 				}
473 				return;
474 			}
475 			stats.incrementDisconnectCount();
476 			handler.forward(message, stompAccessor);
477 		}
478 		else {
479 			StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
480 			if (handler == null) {
481 				if (logger.isDebugEnabled()) {
482 					logger.debug("No TCP connection for session " + sessionId + " in " + message);
483 				}
484 				return;
485 			}
486 			handler.forward(message, stompAccessor);
487 		}
488 	}
489 
490 	@Override
491 	public String toString() {
492 		return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]";
493 	}
494 
495 
496 	private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
497 
498 		private final String sessionId;
499 
500 		private final boolean isRemoteClientSession;
501 
502 		private final StompHeaderAccessor connectHeaders;
503 
504 		private volatile TcpConnection<byte[]> tcpConnection;
505 
506 		private volatile boolean isStompConnected;
507 
508 
509 		private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
510 			this(sessionId, connectHeaders, true);
511 		}
512 
513 		private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
514 			Assert.notNull(sessionId, "'sessionId' must not be null");
515 			Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
516 			this.sessionId = sessionId;
517 			this.connectHeaders = connectHeaders;
518 			this.isRemoteClientSession = isClientSession;
519 		}
520 
521 		public String getSessionId() {
522 			return this.sessionId;
523 		}
524 
525 		@Override
526 		public void afterConnected(TcpConnection<byte[]> connection) {
527 			if (logger.isDebugEnabled()) {
528 				logger.debug("TCP connection opened in session=" + getSessionId());
529 			}
530 			this.tcpConnection = connection;
531 			connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
532 		}
533 
534 		@Override
535 		public void afterConnectFailure(Throwable ex) {
536 			handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex);
537 		}
538 
539 		/**
540 		 * Invoked when any TCP connectivity issue is detected, i.e. failure to establish
541 		 * the TCP connection, failure to send a message, missed heartbeat, etc.
542 		 */
543 		protected void handleTcpConnectionFailure(String error, Throwable ex) {
544 			if (logger.isErrorEnabled()) {
545 				logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex);
546 			}
547 			try {
548 				sendStompErrorFrameToClient(error);
549 			}
550 			finally {
551 				try {
552 					clearConnection();
553 				}
554 				catch (Throwable ex2) {
555 					if (logger.isDebugEnabled()) {
556 						logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex2);
557 					}
558 				}
559 			}
560 		}
561 
562 		private void sendStompErrorFrameToClient(String errorText) {
563 			if (this.isRemoteClientSession) {
564 				StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
565 				if (getHeaderInitializer() != null) {
566 					getHeaderInitializer().initHeaders(headerAccessor);
567 				}
568 				headerAccessor.setSessionId(this.sessionId);
569 				headerAccessor.setUser(this.connectHeaders.getUser());
570 				headerAccessor.setMessage(errorText);
571 				Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
572 				headerAccessor.setImmutable();
573 				sendMessageToClient(errorMessage);
574 			}
575 		}
576 
577 		protected void sendMessageToClient(Message<?> message) {
578 			if (this.isRemoteClientSession) {
579 				StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message);
580 			}
581 		}
582 
583 		@Override
584 		public void handleMessage(Message<byte[]> message) {
585 			StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
586 			accessor.setSessionId(this.sessionId);
587 			accessor.setUser(this.connectHeaders.getUser());
588 
589 			StompCommand command = accessor.getCommand();
590 			if (StompCommand.CONNECTED.equals(command)) {
591 				if (logger.isDebugEnabled()) {
592 					logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
593 				}
594 				afterStompConnected(accessor);
595 			}
596 			else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
597 				logger.error("Received " + accessor.getShortLogMessage(message.getPayload()));
598 			}
599 			else if (logger.isTraceEnabled()) {
600 				logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
601 			}
602 
603 			accessor.setImmutable();
604 			sendMessageToClient(message);
605 		}
606 
607 		/**
608 		 * Invoked after the STOMP CONNECTED frame is received. At this point the
609 		 * connection is ready for sending STOMP messages to the broker.
610 		 */
611 		protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
612 			this.isStompConnected = true;
613 			stats.incrementConnectedCount();
614 			initHeartbeats(connectedHeaders);
615 		}
616 
617 		private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
618 			if (this.isRemoteClientSession) {
619 				return;
620 			}
621 
622 			long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
623 			long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
624 			long serverSendInterval = connectedHeaders.getHeartbeat()[0];
625 			long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
626 
627 			if (clientSendInterval > 0 && serverReceiveInterval > 0) {
628 				long interval = Math.max(clientSendInterval,  serverReceiveInterval);
629 				this.tcpConnection.onWriteInactivity(new Runnable() {
630 					@Override
631 					public void run() {
632 						TcpConnection<byte[]> conn = tcpConnection;
633 						if (conn != null) {
634 							conn.send(HEARTBEAT_MESSAGE).addCallback(
635 									new ListenableFutureCallback<Void>() {
636 										public void onSuccess(Void result) {
637 										}
638 										public void onFailure(Throwable ex) {
639 											String error = "failed to forward heartbeat in \"system\" session.";
640 											handleTcpConnectionFailure(error, ex);
641 										}
642 									});
643 						}
644 					}
645 				}, interval);
646 			}
647 			if (clientReceiveInterval > 0 && serverSendInterval > 0) {
648 				final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
649 				this.tcpConnection.onReadInactivity(new Runnable() {
650 					@Override
651 					public void run() {
652 						handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null);
653 					}
654 				}, interval);
655 			}
656 		}
657 
658 		@Override
659 		public void handleFailure(Throwable ex) {
660 			if (this.tcpConnection != null) {
661 				handleTcpConnectionFailure("transport failure.", ex);
662 			}
663 			else if (logger.isErrorEnabled()) {
664 				logger.error("Transport failure: " + ex);
665 			}
666 		}
667 
668 		@Override
669 		public void afterConnectionClosed() {
670 			if (this.tcpConnection == null) {
671 				return;
672 			}
673 			try {
674 				if (logger.isDebugEnabled()) {
675 					logger.debug("TCP connection to broker closed in session " + this.sessionId);
676 				}
677 				sendStompErrorFrameToClient("Connection to broker closed.");
678 			}
679 			finally {
680 				try {
681 					// Prevent clearConnection() from trying to close
682 					this.tcpConnection = null;
683 					clearConnection();
684 				}
685 				catch (Throwable ex) {
686 					// Shouldn't happen with connection reset beforehand
687 				}
688 			}
689 		}
690 
691 		/**
692 		 * Forward the given message to the STOMP broker.
693 		 * <p>The method checks whether we have an active TCP connection and have
694 		 * received the STOMP CONNECTED frame. For client messages this should be
695 		 * false only if we lose the TCP connection around the same time when a
696 		 * client message is being forwarded, so we simply log the ignored message
697 		 * at debug level. For messages from within the application being sent on
698 		 * the "system" connection an exception is raised so that components sending
699 		 * the message have a chance to handle it -- by default the broker message
700 		 * channel is synchronous.
701 		 * <p>Note that if messages arrive concurrently around the same time a TCP
702 		 * connection is lost, there is a brief period of time before the connection
703 		 * is reset when one or more messages may sneak through and an attempt made
704 		 * to forward them. Rather than synchronizing to guard against that, this
705 		 * method simply lets them try and fail. For client sessions that may
706 		 * result in an additional STOMP ERROR frame(s) being sent downstream but
707 		 * code handling that downstream should be idempotent in such cases.
708 		 * @param message the message to send (never {@code null})
709 		 * @return a future to wait for the result
710 		 */
711 		@SuppressWarnings("unchecked")
712 		public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
713 			TcpConnection<byte[]> conn = this.tcpConnection;
714 
715 			if (!this.isStompConnected) {
716 				if (this.isRemoteClientSession) {
717 					if (logger.isDebugEnabled()) {
718 						logger.debug("TCP connection closed already, ignoring " +
719 								accessor.getShortLogMessage(message.getPayload()));
720 					}
721 					return EMPTY_TASK;
722 				}
723 				else {
724 					throw new IllegalStateException("Cannot forward messages " +
725 							(conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
726 							"Consider subscribing to receive BrokerAvailabilityEvent's from " +
727 							"an ApplicationListener Spring bean. Dropped " +
728 							accessor.getShortLogMessage(message.getPayload()));
729 				}
730 			}
731 
732 			final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
733 					MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
734 
735 			StompCommand command = accessor.getCommand();
736 			if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
737 					StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
738 				logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload()));
739 			}
740 			else if (logger.isTraceEnabled()) {
741 				logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
742 			}
743 
744 			ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
745 			future.addCallback(new ListenableFutureCallback<Void>() {
746 				@Override
747 				public void onSuccess(Void result) {
748 					if (accessor.getCommand() == StompCommand.DISCONNECT) {
749 						afterDisconnectSent(accessor);
750 					}
751 				}
752 				@Override
753 				public void onFailure(Throwable ex) {
754 					if (tcpConnection != null) {
755 						handleTcpConnectionFailure("failed to forward " +
756 								accessor.getShortLogMessage(message.getPayload()), ex);
757 					}
758 					else if (logger.isErrorEnabled()) {
759 						logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
760 					}
761 				}
762 			});
763 			return future;
764 		}
765 
766 		/**
767 		 * After a DISCONNECT there should be no more client frames so we can
768 		 * close the connection pro-actively. However, if the DISCONNECT has a
769 		 * receipt header we leave the connection open and expect the server will
770 		 * respond with a RECEIPT and then close the connection.
771 		 * @see <a href="http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT">
772 		 *     STOMP Specification 1.2 DISCONNECT</a>
773 		 */
774 		private void afterDisconnectSent(StompHeaderAccessor accessor) {
775 			if (accessor.getReceipt() == null) {
776 				try {
777 					clearConnection();
778 				}
779 				catch (Throwable ex) {
780 					if (logger.isDebugEnabled()) {
781 						logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex);
782 					}
783 				}
784 			}
785 		}
786 
787 		/**
788 		 * Clean up state associated with the connection and close it.
789 		 * Any exception arising from closing the connection are propagated.
790 		 */
791 		public void clearConnection() {
792 			if (logger.isDebugEnabled()) {
793 				logger.debug("Cleaning up connection state for session " + this.sessionId);
794 			}
795 
796 			if (this.isRemoteClientSession) {
797 				StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
798 			}
799 
800 			this.isStompConnected = false;
801 
802 			TcpConnection<byte[]> conn = this.tcpConnection;
803 			this.tcpConnection = null;
804 			if (conn != null) {
805 				if (logger.isDebugEnabled()) {
806 					logger.debug("Closing TCP connection in session " + this.sessionId);
807 				}
808 				conn.close();
809 			}
810 		}
811 
812 		@Override
813 		public String toString() {
814 			return "StompConnectionHandler[sessionId=" + this.sessionId + "]";
815 		}
816 	}
817 
818 
819 	private class SystemStompConnectionHandler extends StompConnectionHandler {
820 
821 		public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
822 			super(SYSTEM_SESSION_ID, connectHeaders, false);
823 		}
824 
825 		@Override
826 		protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
827 			if (logger.isInfoEnabled()) {
828 				logger.info("\"System\" session connected.");
829 			}
830 			super.afterStompConnected(connectedHeaders);
831 			publishBrokerAvailableEvent();
832 		}
833 
834 		@Override
835 		protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
836 			super.handleTcpConnectionFailure(errorMessage, ex);
837 			publishBrokerUnavailableEvent();
838 		}
839 
840 		@Override
841 		public void afterConnectionClosed() {
842 			super.afterConnectionClosed();
843 			publishBrokerUnavailableEvent();
844 		}
845 
846 		@Override
847 		public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
848 			try {
849 				ListenableFuture<Void> future = super.forward(message, accessor);
850 				future.get();
851 				return future;
852 			}
853 			catch (Throwable ex) {
854 				throw new MessageDeliveryException(message, ex);
855 			}
856 		}
857 	}
858 
859 
860 	private static class StompTcpClientFactory {
861 
862 		public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor11StompCodec codec) {
863 			return new Reactor11TcpClient<byte[]>(relayHost, relayPort, codec);
864 		}
865 	}
866 
867 
868 	private static class VoidCallable implements Callable<Void> {
869 
870 		@Override
871 		public Void call() throws Exception {
872 			return null;
873 		}
874 	}
875 
876 
877 	private class Stats {
878 
879 		private final AtomicInteger connect = new AtomicInteger();
880 
881 		private final AtomicInteger connected = new AtomicInteger();
882 
883 		private final AtomicInteger disconnect = new AtomicInteger();
884 
885 		public void incrementConnectCount() {
886 			this.connect.incrementAndGet();
887 		}
888 
889 		public void incrementConnectedCount() {
890 			this.connected.incrementAndGet();
891 		}
892 
893 		public void incrementDisconnectCount() {
894 			this.disconnect.incrementAndGet();
895 		}
896 
897 		public String toString() {
898 			return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
899 					(isBrokerAvailable() ? " (available)" : " (not available)") +
900 					", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
901 					this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
902 		}
903 	}
904 
905 }