1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.messaging.simp.stomp;
18
19 import java.nio.charset.Charset;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.activemq.broker.BrokerService;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Rule;
34 import org.junit.Test;
35 import org.junit.rules.TestName;
36
37 import org.springframework.context.ApplicationEvent;
38 import org.springframework.context.ApplicationEventPublisher;
39 import org.springframework.messaging.Message;
40 import org.springframework.messaging.MessageDeliveryException;
41 import org.springframework.messaging.MessageHandler;
42 import org.springframework.messaging.MessagingException;
43 import org.springframework.messaging.StubMessageChannel;
44 import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
45 import org.springframework.messaging.simp.SimpMessageType;
46 import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
47 import org.springframework.messaging.support.ExecutorSubscribableChannel;
48 import org.springframework.messaging.support.MessageBuilder;
49 import org.springframework.util.Assert;
50 import org.springframework.util.SocketUtils;
51
52 import static org.junit.Assert.*;
53
54
55
56
57
58
59 public class StompBrokerRelayMessageHandlerIntegrationTests {
60
61 @Rule
62 public final TestName testName = new TestName();
63
64 private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class);
65
66 private static final Charset UTF_8 = Charset.forName("UTF-8");
67
68 private StompBrokerRelayMessageHandler relay;
69
70 private BrokerService activeMQBroker;
71
72 private ExecutorSubscribableChannel responseChannel;
73
74 private TestMessageHandler responseHandler;
75
76 private TestEventPublisher eventPublisher;
77
78 private int port;
79
80
81 @Before
82 public void setUp() throws Exception {
83 logger.debug("Setting up before '" + this.testName.getMethodName() + "'");
84 this.port = SocketUtils.findAvailableTcpPort(61613);
85 this.responseChannel = new ExecutorSubscribableChannel();
86 this.responseHandler = new TestMessageHandler();
87 this.responseChannel.subscribe(this.responseHandler);
88 this.eventPublisher = new TestEventPublisher();
89 startActiveMqBroker();
90 createAndStartRelay();
91 }
92
93 private void startActiveMqBroker() throws Exception {
94 this.activeMQBroker = new BrokerService();
95 this.activeMQBroker.addConnector("stomp://localhost:" + this.port);
96 this.activeMQBroker.setStartAsync(false);
97 this.activeMQBroker.setPersistent(false);
98 this.activeMQBroker.setUseJmx(false);
99 this.activeMQBroker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 5);
100 this.activeMQBroker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5);
101 this.activeMQBroker.start();
102 }
103
104 private void createAndStartRelay() throws InterruptedException {
105 this.relay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
106 this.responseChannel, new StubMessageChannel(), Arrays.asList("/queue/", "/topic/"));
107 this.relay.setRelayPort(this.port);
108 this.relay.setApplicationEventPublisher(this.eventPublisher);
109 this.relay.setSystemHeartbeatReceiveInterval(0);
110 this.relay.setSystemHeartbeatSendInterval(0);
111
112 this.relay.start();
113 this.eventPublisher.expectBrokerAvailabilityEvent(true);
114 }
115
116 @After
117 public void tearDown() throws Exception {
118 try {
119 logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo());
120 this.relay.stop();
121 }
122 finally {
123 stopActiveMqBrokerAndAwait();
124 }
125 }
126
127 private void stopActiveMqBrokerAndAwait() throws Exception {
128 logger.debug("Stopping ActiveMQ broker and will await shutdown");
129 if (!this.activeMQBroker.isStarted()) {
130 logger.debug("Broker not running");
131 return;
132 }
133 final CountDownLatch latch = new CountDownLatch(1);
134 this.activeMQBroker.addShutdownHook(new Runnable() {
135 public void run() {
136 latch.countDown();
137 }
138 });
139 this.activeMQBroker.stop();
140 assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
141 logger.debug("Broker stopped");
142 }
143
144 @Test
145 public void publishSubscribe() throws Exception {
146
147 logger.debug("Starting test publishSubscribe()");
148
149 String sess1 = "sess1";
150 String sess2 = "sess2";
151 String subs1 = "subs1";
152 String destination = "/topic/test";
153
154 MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
155 MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build();
156 this.relay.handleMessage(conn1.message);
157 this.relay.handleMessage(conn2.message);
158 this.responseHandler.expectMessages(conn1, conn2);
159
160 MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
161 this.relay.handleMessage(subscribe.message);
162 this.responseHandler.expectMessages(subscribe);
163
164 MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
165 this.relay.handleMessage(send.message);
166 this.responseHandler.expectMessages(send);
167 }
168
169 @Test(expected=MessageDeliveryException.class)
170 public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
171
172 logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
173
174 stopActiveMqBrokerAndAwait();
175 this.eventPublisher.expectBrokerAvailabilityEvent(false);
176
177 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
178 this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
179 }
180
181 @Test
182 public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
183
184 logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()");
185
186 String sess1 = "sess1";
187 MessageExchange connect = MessageExchangeBuilder.connect(sess1).build();
188 this.relay.handleMessage(connect.message);
189 this.responseHandler.expectMessages(connect);
190
191 MessageExchange error = MessageExchangeBuilder.error(sess1).build();
192 stopActiveMqBrokerAndAwait();
193 this.eventPublisher.expectBrokerAvailabilityEvent(false);
194 this.responseHandler.expectMessages(error);
195 }
196
197 @Test
198 public void brokerAvailabilityEventWhenStopped() throws Exception {
199
200 logger.debug("Starting test brokerAvailabilityEventWhenStopped()");
201
202 stopActiveMqBrokerAndAwait();
203 this.eventPublisher.expectBrokerAvailabilityEvent(false);
204 }
205
206 @Test
207 public void relayReconnectsIfBrokerComesBackUp() throws Exception {
208
209 logger.debug("Starting test relayReconnectsIfBrokerComesBackUp()");
210
211 String sess1 = "sess1";
212 MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
213 this.relay.handleMessage(conn1.message);
214 this.responseHandler.expectMessages(conn1);
215
216 String subs1 = "subs1";
217 String destination = "/topic/test";
218 MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
219 this.relay.handleMessage(subscribe.message);
220 this.responseHandler.expectMessages(subscribe);
221
222 MessageExchange error = MessageExchangeBuilder.error(sess1).build();
223 stopActiveMqBrokerAndAwait();
224 this.responseHandler.expectMessages(error);
225
226 this.eventPublisher.expectBrokerAvailabilityEvent(false);
227
228 startActiveMqBroker();
229 this.eventPublisher.expectBrokerAvailabilityEvent(true);
230 }
231
232 @Test
233 public void disconnectClosesRelaySessionCleanly() throws Exception {
234
235 logger.debug("Starting test disconnectClosesRelaySessionCleanly()");
236
237 MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
238 this.relay.handleMessage(connect.message);
239 this.responseHandler.expectMessages(connect);
240
241 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
242 headers.setSessionId("sess1");
243 this.relay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
244
245 Thread.sleep(2000);
246
247
248 assertTrue("Unexpected messages: " + this.responseHandler.queue, this.responseHandler.queue.isEmpty());
249 }
250
251 @Test
252 public void disconnectWithReceipt() throws Exception {
253
254 logger.debug("Starting test disconnectWithReceipt()");
255
256 MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
257 this.relay.handleMessage(connect.message);
258 this.responseHandler.expectMessages(connect);
259
260 MessageExchange disconnect = MessageExchangeBuilder.disconnectWithReceipt("sess1", "r123").build();
261 this.relay.handleMessage(disconnect.message);
262
263 this.responseHandler.expectMessages(disconnect);
264 }
265
266
267 private static class TestEventPublisher implements ApplicationEventPublisher {
268
269 private final BlockingQueue<BrokerAvailabilityEvent> eventQueue = new LinkedBlockingQueue<>();
270
271 @Override
272 public void publishEvent(ApplicationEvent event) {
273 logger.debug("Processing ApplicationEvent " + event);
274 if (event instanceof BrokerAvailabilityEvent) {
275 this.eventQueue.add((BrokerAvailabilityEvent) event);
276 }
277 }
278
279 public void expectBrokerAvailabilityEvent(boolean isBrokerAvailable) throws InterruptedException {
280 BrokerAvailabilityEvent event = this.eventQueue.poll(20000, TimeUnit.MILLISECONDS);
281 assertNotNull("Times out waiting for BrokerAvailabilityEvent[" + isBrokerAvailable + "]", event);
282 assertEquals(isBrokerAvailable, event.isBrokerAvailable());
283 }
284 }
285
286 private static class TestMessageHandler implements MessageHandler {
287
288 private final BlockingQueue<Message<?>> queue = new LinkedBlockingQueue<>();
289
290 @Override
291 public void handleMessage(Message<?> message) throws MessagingException {
292 if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(message.getHeaders())) {
293 return;
294 }
295 this.queue.add(message);
296 }
297
298 public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException {
299
300 List<MessageExchange> expectedMessages =
301 new ArrayList<MessageExchange>(Arrays.<MessageExchange>asList(messageExchanges));
302
303 while (expectedMessages.size() > 0) {
304 Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS);
305 assertNotNull("Timed out waiting for messages, expected [" + expectedMessages + "]", message);
306
307 MessageExchange match = findMatch(expectedMessages, message);
308 assertNotNull("Unexpected message=" + message + ", expected [" + expectedMessages + "]", match);
309
310 expectedMessages.remove(match);
311 }
312 }
313
314 private MessageExchange findMatch(List<MessageExchange> expectedMessages, Message<?> message) {
315 for (MessageExchange exchange : expectedMessages) {
316 if (exchange.matchMessage(message)) {
317 return exchange;
318 }
319 }
320 return null;
321 }
322 }
323
324
325
326
327 private static class MessageExchange {
328
329 private final Message<?> message;
330
331 private final MessageMatcher[] expected;
332
333 private final Message<?>[] actual;
334
335 public MessageExchange(Message<?> message, MessageMatcher... expected) {
336 this.message = message;
337 this.expected = expected;
338 this.actual = new Message<?>[expected.length];
339 }
340
341 public boolean matchMessage(Message<?> message) {
342 for (int i=0 ; i < this.expected.length; i++) {
343 if (this.expected[i].match(message)) {
344 this.actual[i] = message;
345 return true;
346 }
347 }
348 return false;
349 }
350
351 @Override
352 public String toString() {
353 return "Forwarded message:\n" + this.message + "\n" +
354 "Should receive back:\n" + Arrays.toString(this.expected) + "\n" +
355 "Actually received:\n" + Arrays.toString(this.actual) + "\n";
356 }
357 }
358
359 private static class MessageExchangeBuilder {
360
361 private final Message<?> message;
362
363 private final StompHeaderAccessor headers;
364
365 private final List<MessageMatcher> expected = new ArrayList<>();
366
367
368 private MessageExchangeBuilder(Message<?> message) {
369 this.message = message;
370 this.headers = StompHeaderAccessor.wrap(message);
371 }
372
373 public static MessageExchangeBuilder error(String sessionId) {
374 return new MessageExchangeBuilder(null).andExpectError(sessionId);
375 }
376
377 public static MessageExchangeBuilder connect(String sessionId) {
378 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
379 headers.setSessionId(sessionId);
380 headers.setAcceptVersion("1.1,1.2");
381 headers.setHeartbeat(0, 0);
382 Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
383
384 MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
385 builder.expected.add(new StompConnectedFrameMessageMatcher(sessionId));
386 return builder;
387 }
388
389
390 @SuppressWarnings("unused")
391 public static MessageExchangeBuilder connectWithError(String sessionId) {
392 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
393 headers.setSessionId(sessionId);
394 headers.setAcceptVersion("1.1,1.2");
395 Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
396 MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
397 return builder.andExpectError();
398 }
399
400 public static MessageExchangeBuilder subscribeWithReceipt(String sessionId, String subscriptionId,
401 String destination, String receiptId) {
402
403 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
404 headers.setSessionId(sessionId);
405 headers.setSubscriptionId(subscriptionId);
406 headers.setDestination(destination);
407 headers.setReceipt(receiptId);
408 Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
409
410 MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
411 builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId));
412 return builder;
413 }
414
415 public static MessageExchangeBuilder send(String destination, String payload) {
416 SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
417 headers.setDestination(destination);
418 Message<?> message = MessageBuilder.createMessage(payload.getBytes(UTF_8), headers.getMessageHeaders());
419 return new MessageExchangeBuilder(message);
420 }
421
422 public static MessageExchangeBuilder disconnectWithReceipt(String sessionId, String receiptId) {
423
424 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
425 headers.setSessionId(sessionId);
426 headers.setReceipt(receiptId);
427 Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
428
429 MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
430 builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId));
431 return builder;
432 }
433
434 public MessageExchangeBuilder andExpectMessage(String sessionId, String subscriptionId) {
435 Assert.isTrue(SimpMessageType.MESSAGE.equals(headers.getMessageType()));
436 String destination = this.headers.getDestination();
437 Object payload = this.message.getPayload();
438 this.expected.add(new StompMessageFrameMessageMatcher(sessionId, subscriptionId, destination, payload));
439 return this;
440 }
441
442 public MessageExchangeBuilder andExpectError() {
443 String sessionId = this.headers.getSessionId();
444 Assert.notNull(sessionId, "No sessionId to match the ERROR frame to");
445 return andExpectError(sessionId);
446 }
447
448 public MessageExchangeBuilder andExpectError(String sessionId) {
449 this.expected.add(new StompFrameMessageMatcher(StompCommand.ERROR, sessionId));
450 return this;
451 }
452
453 public MessageExchange build() {
454 return new MessageExchange(this.message, this.expected.toArray(new MessageMatcher[this.expected.size()]));
455 }
456 }
457
458 private static interface MessageMatcher {
459
460 boolean match(Message<?> message);
461
462 }
463
464 private static class StompFrameMessageMatcher implements MessageMatcher {
465
466 private final StompCommand command;
467
468 private final String sessionId;
469
470
471 public StompFrameMessageMatcher(StompCommand command, String sessionId) {
472 this.command = command;
473 this.sessionId = sessionId;
474 }
475
476
477 @Override
478 public final boolean match(Message<?> message) {
479 StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
480 if (!this.command.equals(headers.getCommand()) || (this.sessionId != headers.getSessionId())) {
481 return false;
482 }
483 return matchInternal(headers, message.getPayload());
484 }
485
486 protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
487 return true;
488 }
489
490 @Override
491 public String toString() {
492 return "command=" + this.command + ", session=\"" + this.sessionId + "\"";
493 }
494 }
495
496 private static class StompReceiptFrameMessageMatcher extends StompFrameMessageMatcher {
497
498 private final String receiptId;
499
500 public StompReceiptFrameMessageMatcher(String sessionId, String receipt) {
501 super(StompCommand.RECEIPT, sessionId);
502 this.receiptId = receipt;
503 }
504
505 @Override
506 protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
507 return (this.receiptId.equals(headers.getReceiptId()));
508 }
509
510 @Override
511 public String toString() {
512 return super.toString() + ", receiptId=\"" + this.receiptId + "\"";
513 }
514 }
515
516 private static class StompMessageFrameMessageMatcher extends StompFrameMessageMatcher {
517
518 private final String subscriptionId;
519
520 private final String destination;
521
522 private final Object payload;
523
524
525 public StompMessageFrameMessageMatcher(String sessionId, String subscriptionId, String destination, Object payload) {
526 super(StompCommand.MESSAGE, sessionId);
527 this.subscriptionId = subscriptionId;
528 this.destination = destination;
529 this.payload = payload;
530 }
531
532 @Override
533 protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
534 if (!this.subscriptionId.equals(headers.getSubscriptionId()) || !this.destination.equals(headers.getDestination())) {
535 return false;
536 }
537 if (payload instanceof byte[] && this.payload instanceof byte[]) {
538 return Arrays.equals((byte[]) payload, (byte[]) this.payload);
539 }
540 else {
541 return this.payload.equals(payload);
542 }
543 }
544
545 @Override
546 public String toString() {
547 return super.toString() + ", subscriptionId=\"" + this.subscriptionId
548 + "\", destination=\"" + this.destination + "\", payload=\"" + getPayloadAsText() + "\"";
549 }
550
551 protected String getPayloadAsText() {
552 return (this.payload instanceof byte[])
553 ? new String((byte[]) this.payload, UTF_8) : payload.toString();
554 }
555 }
556
557 private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher {
558
559
560 public StompConnectedFrameMessageMatcher(String sessionId) {
561 super(StompCommand.CONNECTED, sessionId);
562 }
563
564 }
565
566 }