1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.messaging.simp.stomp;
18
19 import java.util.Collection;
20 import java.util.Map;
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.springframework.messaging.Message;
27 import org.springframework.messaging.MessageChannel;
28 import org.springframework.messaging.MessageDeliveryException;
29 import org.springframework.messaging.SubscribableChannel;
30 import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
31 import org.springframework.messaging.simp.SimpMessageType;
32 import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
33 import org.springframework.messaging.support.MessageBuilder;
34 import org.springframework.messaging.support.MessageHeaderAccessor;
35 import org.springframework.messaging.support.MessageHeaderInitializer;
36 import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
37 import org.springframework.messaging.tcp.TcpConnection;
38 import org.springframework.messaging.tcp.TcpConnectionHandler;
39 import org.springframework.messaging.tcp.TcpOperations;
40 import org.springframework.messaging.tcp.reactor.Reactor11TcpClient;
41 import org.springframework.util.Assert;
42 import org.springframework.util.concurrent.ListenableFuture;
43 import org.springframework.util.concurrent.ListenableFutureCallback;
44 import org.springframework.util.concurrent.ListenableFutureTask;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
76
77 public static final String SYSTEM_SESSION_ID = "_system_";
78
79 private static final byte[] EMPTY_PAYLOAD = new byte[0];
80
81 private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable());
82
83
84 private static final long HEARTBEAT_MULTIPLIER = 3;
85
86 private static final Message<byte[]> HEARTBEAT_MESSAGE;
87
88
89 static {
90 EMPTY_TASK.run();
91 StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
92 HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
93 }
94
95
96 private String relayHost = "127.0.0.1";
97
98 private int relayPort = 61613;
99
100 private String clientLogin = "guest";
101
102 private String clientPasscode = "guest";
103
104 private String systemLogin = "guest";
105
106 private String systemPasscode = "guest";
107
108 private long systemHeartbeatSendInterval = 10000;
109
110 private long systemHeartbeatReceiveInterval = 10000;
111
112 private String virtualHost;
113
114 private TcpOperations<byte[]> tcpClient;
115
116 private MessageHeaderInitializer headerInitializer;
117
118 private final Map<String, StompConnectionHandler> connectionHandlers =
119 new ConcurrentHashMap<String, StompConnectionHandler>();
120
121 private final Stats stats = new Stats();
122
123
124
125
126
127
128
129
130
131
132
133 public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel,
134 SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
135
136 super(inboundChannel, outboundChannel, brokerChannel, destinationPrefixes);
137 }
138
139
140
141
142
143 public void setRelayHost(String relayHost) {
144 Assert.hasText(relayHost, "relayHost must not be empty");
145 this.relayHost = relayHost;
146 }
147
148
149
150
151 public String getRelayHost() {
152 return this.relayHost;
153 }
154
155
156
157
158 public void setRelayPort(int relayPort) {
159 this.relayPort = relayPort;
160 }
161
162
163
164
165 public int getRelayPort() {
166 return this.relayPort;
167 }
168
169
170
171
172
173
174
175
176 public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
177 this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
178 }
179
180
181
182
183
184 public long getSystemHeartbeatSendInterval() {
185 return this.systemHeartbeatSendInterval;
186 }
187
188
189
190
191
192
193
194
195
196 public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
197 this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
198 }
199
200
201
202
203
204 public long getSystemHeartbeatReceiveInterval() {
205 return this.systemHeartbeatReceiveInterval;
206 }
207
208
209
210
211
212
213
214 public void setClientLogin(String clientLogin) {
215 Assert.hasText(clientLogin, "clientLogin must not be empty");
216 this.clientLogin = clientLogin;
217 }
218
219
220
221
222
223
224 public String getClientLogin() {
225 return this.clientLogin;
226 }
227
228
229
230
231
232
233
234 public void setClientPasscode(String clientPasscode) {
235 Assert.hasText(clientPasscode, "clientPasscode must not be empty");
236 this.clientPasscode = clientPasscode;
237 }
238
239
240
241
242
243
244 public String getClientPasscode() {
245 return this.clientPasscode;
246 }
247
248
249
250
251
252
253
254 public void setSystemLogin(String systemLogin) {
255 Assert.hasText(systemLogin, "systemLogin must not be empty");
256 this.systemLogin = systemLogin;
257 }
258
259
260
261
262 public String getSystemLogin() {
263 return this.systemLogin;
264 }
265
266
267
268
269
270
271
272 public void setSystemPasscode(String systemPasscode) {
273 this.systemPasscode = systemPasscode;
274 }
275
276
277
278
279 public String getSystemPasscode() {
280 return this.systemPasscode;
281 }
282
283
284
285
286
287
288
289
290
291 public void setVirtualHost(String virtualHost) {
292 this.virtualHost = virtualHost;
293 }
294
295
296
297
298 public String getVirtualHost() {
299 return this.virtualHost;
300 }
301
302
303
304
305
306 public void setTcpClient(TcpOperations<byte[]> tcpClient) {
307 this.tcpClient = tcpClient;
308 }
309
310
311
312
313
314
315 public TcpOperations<byte[]> getTcpClient() {
316 return this.tcpClient;
317 }
318
319
320
321
322 public int getConnectionCount() {
323 return this.connectionHandlers.size();
324 }
325
326
327
328
329
330
331
332 public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
333 this.headerInitializer = headerInitializer;
334 }
335
336
337
338
339 public MessageHeaderInitializer getHeaderInitializer() {
340 return this.headerInitializer;
341 }
342
343
344
345
346 public String getStatsInfo() {
347 return this.stats.toString();
348 }
349
350
351 @Override
352 protected void startInternal() {
353 if (this.tcpClient == null) {
354 StompDecoder decoder = new StompDecoder();
355 decoder.setHeaderInitializer(getHeaderInitializer());
356 Reactor11StompCodec codec = new Reactor11StompCodec(new StompEncoder(), decoder);
357 this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
358 }
359
360 if (logger.isInfoEnabled()) {
361 logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
362 }
363
364 StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
365 accessor.setAcceptVersion("1.1,1.2");
366 accessor.setLogin(this.systemLogin);
367 accessor.setPasscode(this.systemPasscode);
368 accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
369 accessor.setHost(getVirtualHost());
370 accessor.setSessionId(SYSTEM_SESSION_ID);
371 if (logger.isDebugEnabled()) {
372 logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
373 }
374
375 SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
376 this.connectionHandlers.put(handler.getSessionId(), handler);
377
378 this.stats.incrementConnectCount();
379 this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000));
380 }
381
382 @Override
383 protected void stopInternal() {
384 publishBrokerUnavailableEvent();
385 try {
386 this.tcpClient.shutdown().get(5000, TimeUnit.MILLISECONDS);
387 }
388 catch (Throwable ex) {
389 logger.error("Error in shutdown of TCP client", ex);
390 }
391 }
392
393 @Override
394 protected void handleMessageInternal(Message<?> message) {
395 String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
396
397 if (!isBrokerAvailable()) {
398 if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) {
399 throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
400 "receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
401 }
402 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders());
403 if (logger.isErrorEnabled() && SimpMessageType.CONNECT.equals(messageType)) {
404 logger.error("Broker not active. Ignoring " + message);
405 }
406 else if (logger.isDebugEnabled()) {
407 logger.debug("Broker not active. Ignoring " + message);
408 }
409 return;
410 }
411
412 StompHeaderAccessor stompAccessor;
413 StompCommand command;
414
415 MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
416 if (accessor == null) {
417 throw new IllegalStateException(
418 "No header accessor (not using the SimpMessagingTemplate?): " + message);
419 }
420 else if (accessor instanceof StompHeaderAccessor) {
421 stompAccessor = (StompHeaderAccessor) accessor;
422 command = stompAccessor.getCommand();
423 }
424 else if (accessor instanceof SimpMessageHeaderAccessor) {
425 stompAccessor = StompHeaderAccessor.wrap(message);
426 command = stompAccessor.getCommand();
427 if (command == null) {
428 command = stompAccessor.updateStompCommandAsClientMessage();
429 }
430 }
431 else {
432 throw new IllegalStateException(
433 "Unexpected header accessor type " + accessor.getClass() + " in " + message);
434 }
435
436 if (sessionId == null) {
437 if (!SimpMessageType.MESSAGE.equals(stompAccessor.getMessageType())) {
438 if (logger.isErrorEnabled()) {
439 logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
440 }
441 return;
442 }
443 sessionId = SYSTEM_SESSION_ID;
444 stompAccessor.setSessionId(sessionId);
445 }
446
447 String destination = stompAccessor.getDestination();
448 if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
449 return;
450 }
451
452 if (StompCommand.CONNECT.equals(command)) {
453 if (logger.isDebugEnabled()) {
454 logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
455 }
456 stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
457 stompAccessor.setLogin(this.clientLogin);
458 stompAccessor.setPasscode(this.clientPasscode);
459 if (getVirtualHost() != null) {
460 stompAccessor.setHost(getVirtualHost());
461 }
462 StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
463 this.connectionHandlers.put(sessionId, handler);
464 this.stats.incrementConnectCount();
465 this.tcpClient.connect(handler);
466 }
467 else if (StompCommand.DISCONNECT.equals(command)) {
468 StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
469 if (handler == null) {
470 if (logger.isDebugEnabled()) {
471 logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
472 }
473 return;
474 }
475 stats.incrementDisconnectCount();
476 handler.forward(message, stompAccessor);
477 }
478 else {
479 StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
480 if (handler == null) {
481 if (logger.isDebugEnabled()) {
482 logger.debug("No TCP connection for session " + sessionId + " in " + message);
483 }
484 return;
485 }
486 handler.forward(message, stompAccessor);
487 }
488 }
489
490 @Override
491 public String toString() {
492 return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]";
493 }
494
495
496 private class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
497
498 private final String sessionId;
499
500 private final boolean isRemoteClientSession;
501
502 private final StompHeaderAccessor connectHeaders;
503
504 private volatile TcpConnection<byte[]> tcpConnection;
505
506 private volatile boolean isStompConnected;
507
508
509 private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) {
510 this(sessionId, connectHeaders, true);
511 }
512
513 private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, boolean isClientSession) {
514 Assert.notNull(sessionId, "'sessionId' must not be null");
515 Assert.notNull(connectHeaders, "'connectHeaders' must not be null");
516 this.sessionId = sessionId;
517 this.connectHeaders = connectHeaders;
518 this.isRemoteClientSession = isClientSession;
519 }
520
521 public String getSessionId() {
522 return this.sessionId;
523 }
524
525 @Override
526 public void afterConnected(TcpConnection<byte[]> connection) {
527 if (logger.isDebugEnabled()) {
528 logger.debug("TCP connection opened in session=" + getSessionId());
529 }
530 this.tcpConnection = connection;
531 connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
532 }
533
534 @Override
535 public void afterConnectFailure(Throwable ex) {
536 handleTcpConnectionFailure("failed to establish TCP connection in session " + this.sessionId, ex);
537 }
538
539
540
541
542
543 protected void handleTcpConnectionFailure(String error, Throwable ex) {
544 if (logger.isErrorEnabled()) {
545 logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex);
546 }
547 try {
548 sendStompErrorFrameToClient(error);
549 }
550 finally {
551 try {
552 clearConnection();
553 }
554 catch (Throwable ex2) {
555 if (logger.isDebugEnabled()) {
556 logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex2);
557 }
558 }
559 }
560 }
561
562 private void sendStompErrorFrameToClient(String errorText) {
563 if (this.isRemoteClientSession) {
564 StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
565 if (getHeaderInitializer() != null) {
566 getHeaderInitializer().initHeaders(headerAccessor);
567 }
568 headerAccessor.setSessionId(this.sessionId);
569 headerAccessor.setUser(this.connectHeaders.getUser());
570 headerAccessor.setMessage(errorText);
571 Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
572 headerAccessor.setImmutable();
573 sendMessageToClient(errorMessage);
574 }
575 }
576
577 protected void sendMessageToClient(Message<?> message) {
578 if (this.isRemoteClientSession) {
579 StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message);
580 }
581 }
582
583 @Override
584 public void handleMessage(Message<byte[]> message) {
585 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
586 accessor.setSessionId(this.sessionId);
587 accessor.setUser(this.connectHeaders.getUser());
588
589 StompCommand command = accessor.getCommand();
590 if (StompCommand.CONNECTED.equals(command)) {
591 if (logger.isDebugEnabled()) {
592 logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
593 }
594 afterStompConnected(accessor);
595 }
596 else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
597 logger.error("Received " + accessor.getShortLogMessage(message.getPayload()));
598 }
599 else if (logger.isTraceEnabled()) {
600 logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
601 }
602
603 accessor.setImmutable();
604 sendMessageToClient(message);
605 }
606
607
608
609
610
611 protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
612 this.isStompConnected = true;
613 stats.incrementConnectedCount();
614 initHeartbeats(connectedHeaders);
615 }
616
617 private void initHeartbeats(StompHeaderAccessor connectedHeaders) {
618 if (this.isRemoteClientSession) {
619 return;
620 }
621
622 long clientSendInterval = this.connectHeaders.getHeartbeat()[0];
623 long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1];
624 long serverSendInterval = connectedHeaders.getHeartbeat()[0];
625 long serverReceiveInterval = connectedHeaders.getHeartbeat()[1];
626
627 if (clientSendInterval > 0 && serverReceiveInterval > 0) {
628 long interval = Math.max(clientSendInterval, serverReceiveInterval);
629 this.tcpConnection.onWriteInactivity(new Runnable() {
630 @Override
631 public void run() {
632 TcpConnection<byte[]> conn = tcpConnection;
633 if (conn != null) {
634 conn.send(HEARTBEAT_MESSAGE).addCallback(
635 new ListenableFutureCallback<Void>() {
636 public void onSuccess(Void result) {
637 }
638 public void onFailure(Throwable ex) {
639 String error = "failed to forward heartbeat in \"system\" session.";
640 handleTcpConnectionFailure(error, ex);
641 }
642 });
643 }
644 }
645 }, interval);
646 }
647 if (clientReceiveInterval > 0 && serverSendInterval > 0) {
648 final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER;
649 this.tcpConnection.onReadInactivity(new Runnable() {
650 @Override
651 public void run() {
652 handleTcpConnectionFailure("no messages received for more than " + interval + " ms.", null);
653 }
654 }, interval);
655 }
656 }
657
658 @Override
659 public void handleFailure(Throwable ex) {
660 if (this.tcpConnection != null) {
661 handleTcpConnectionFailure("transport failure.", ex);
662 }
663 else if (logger.isErrorEnabled()) {
664 logger.error("Transport failure: " + ex);
665 }
666 }
667
668 @Override
669 public void afterConnectionClosed() {
670 if (this.tcpConnection == null) {
671 return;
672 }
673 try {
674 if (logger.isDebugEnabled()) {
675 logger.debug("TCP connection to broker closed in session " + this.sessionId);
676 }
677 sendStompErrorFrameToClient("Connection to broker closed.");
678 }
679 finally {
680 try {
681
682 this.tcpConnection = null;
683 clearConnection();
684 }
685 catch (Throwable ex) {
686
687 }
688 }
689 }
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711 @SuppressWarnings("unchecked")
712 public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
713 TcpConnection<byte[]> conn = this.tcpConnection;
714
715 if (!this.isStompConnected) {
716 if (this.isRemoteClientSession) {
717 if (logger.isDebugEnabled()) {
718 logger.debug("TCP connection closed already, ignoring " +
719 accessor.getShortLogMessage(message.getPayload()));
720 }
721 return EMPTY_TASK;
722 }
723 else {
724 throw new IllegalStateException("Cannot forward messages " +
725 (conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
726 "Consider subscribing to receive BrokerAvailabilityEvent's from " +
727 "an ApplicationListener Spring bean. Dropped " +
728 accessor.getShortLogMessage(message.getPayload()));
729 }
730 }
731
732 final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
733 MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
734
735 StompCommand command = accessor.getCommand();
736 if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
737 StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
738 logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload()));
739 }
740 else if (logger.isTraceEnabled()) {
741 logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
742 }
743
744 ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
745 future.addCallback(new ListenableFutureCallback<Void>() {
746 @Override
747 public void onSuccess(Void result) {
748 if (accessor.getCommand() == StompCommand.DISCONNECT) {
749 afterDisconnectSent(accessor);
750 }
751 }
752 @Override
753 public void onFailure(Throwable ex) {
754 if (tcpConnection != null) {
755 handleTcpConnectionFailure("failed to forward " +
756 accessor.getShortLogMessage(message.getPayload()), ex);
757 }
758 else if (logger.isErrorEnabled()) {
759 logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
760 }
761 }
762 });
763 return future;
764 }
765
766
767
768
769
770
771
772
773
774 private void afterDisconnectSent(StompHeaderAccessor accessor) {
775 if (accessor.getReceipt() == null) {
776 try {
777 clearConnection();
778 }
779 catch (Throwable ex) {
780 if (logger.isDebugEnabled()) {
781 logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, ex);
782 }
783 }
784 }
785 }
786
787
788
789
790
791 public void clearConnection() {
792 if (logger.isDebugEnabled()) {
793 logger.debug("Cleaning up connection state for session " + this.sessionId);
794 }
795
796 if (this.isRemoteClientSession) {
797 StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
798 }
799
800 this.isStompConnected = false;
801
802 TcpConnection<byte[]> conn = this.tcpConnection;
803 this.tcpConnection = null;
804 if (conn != null) {
805 if (logger.isDebugEnabled()) {
806 logger.debug("Closing TCP connection in session " + this.sessionId);
807 }
808 conn.close();
809 }
810 }
811
812 @Override
813 public String toString() {
814 return "StompConnectionHandler[sessionId=" + this.sessionId + "]";
815 }
816 }
817
818
819 private class SystemStompConnectionHandler extends StompConnectionHandler {
820
821 public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
822 super(SYSTEM_SESSION_ID, connectHeaders, false);
823 }
824
825 @Override
826 protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
827 if (logger.isInfoEnabled()) {
828 logger.info("\"System\" session connected.");
829 }
830 super.afterStompConnected(connectedHeaders);
831 publishBrokerAvailableEvent();
832 }
833
834 @Override
835 protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) {
836 super.handleTcpConnectionFailure(errorMessage, ex);
837 publishBrokerUnavailableEvent();
838 }
839
840 @Override
841 public void afterConnectionClosed() {
842 super.afterConnectionClosed();
843 publishBrokerUnavailableEvent();
844 }
845
846 @Override
847 public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
848 try {
849 ListenableFuture<Void> future = super.forward(message, accessor);
850 future.get();
851 return future;
852 }
853 catch (Throwable ex) {
854 throw new MessageDeliveryException(message, ex);
855 }
856 }
857 }
858
859
860 private static class StompTcpClientFactory {
861
862 public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor11StompCodec codec) {
863 return new Reactor11TcpClient<byte[]>(relayHost, relayPort, codec);
864 }
865 }
866
867
868 private static class VoidCallable implements Callable<Void> {
869
870 @Override
871 public Void call() throws Exception {
872 return null;
873 }
874 }
875
876
877 private class Stats {
878
879 private final AtomicInteger connect = new AtomicInteger();
880
881 private final AtomicInteger connected = new AtomicInteger();
882
883 private final AtomicInteger disconnect = new AtomicInteger();
884
885 public void incrementConnectCount() {
886 this.connect.incrementAndGet();
887 }
888
889 public void incrementConnectedCount() {
890 this.connected.incrementAndGet();
891 }
892
893 public void incrementDisconnectCount() {
894 this.disconnect.incrementAndGet();
895 }
896
897 public String toString() {
898 return connectionHandlers.size() + " sessions, " + relayHost + ":" + relayPort +
899 (isBrokerAvailable() ? " (available)" : " (not available)") +
900 ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" +
901 this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
902 }
903 }
904
905 }