View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.messaging.simp.stomp;
18  
19  import java.nio.charset.Charset;
20  import java.util.ArrayList;
21  import java.util.Arrays;
22  import java.util.List;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.LinkedBlockingQueue;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.activemq.broker.BrokerService;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.junit.After;
32  import org.junit.Before;
33  import org.junit.Rule;
34  import org.junit.Test;
35  import org.junit.rules.TestName;
36  
37  import org.springframework.context.ApplicationEvent;
38  import org.springframework.context.ApplicationEventPublisher;
39  import org.springframework.messaging.Message;
40  import org.springframework.messaging.MessageDeliveryException;
41  import org.springframework.messaging.MessageHandler;
42  import org.springframework.messaging.MessagingException;
43  import org.springframework.messaging.StubMessageChannel;
44  import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
45  import org.springframework.messaging.simp.SimpMessageType;
46  import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
47  import org.springframework.messaging.support.ExecutorSubscribableChannel;
48  import org.springframework.messaging.support.MessageBuilder;
49  import org.springframework.util.Assert;
50  import org.springframework.util.SocketUtils;
51  
52  import static org.junit.Assert.*;
53  
54  /**
55   * Integration tests for {@link StompBrokerRelayMessageHandler} running against ActiveMQ.
56   *
57   * @author Rossen Stoyanchev
58   */
59  public class StompBrokerRelayMessageHandlerIntegrationTests {
60  
61  	@Rule
62  	public final TestName testName = new TestName();
63  
64  	private static final Log logger = LogFactory.getLog(StompBrokerRelayMessageHandlerIntegrationTests.class);
65  
66  	private static final Charset UTF_8 = Charset.forName("UTF-8");
67  
68  	private StompBrokerRelayMessageHandler relay;
69  
70  	private BrokerService activeMQBroker;
71  
72  	private ExecutorSubscribableChannel responseChannel;
73  
74  	private TestMessageHandler responseHandler;
75  
76  	private TestEventPublisher eventPublisher;
77  
78  	private int port;
79  
80  
81  	@Before
82  	public void setUp() throws Exception {
83  		logger.debug("Setting up before '" + this.testName.getMethodName() + "'");
84  		this.port = SocketUtils.findAvailableTcpPort(61613);
85  		this.responseChannel = new ExecutorSubscribableChannel();
86  		this.responseHandler = new TestMessageHandler();
87  		this.responseChannel.subscribe(this.responseHandler);
88  		this.eventPublisher = new TestEventPublisher();
89  		startActiveMqBroker();
90  		createAndStartRelay();
91  	}
92  
93  	private void startActiveMqBroker() throws Exception {
94  		this.activeMQBroker = new BrokerService();
95  		this.activeMQBroker.addConnector("stomp://localhost:" + this.port);
96  		this.activeMQBroker.setStartAsync(false);
97  		this.activeMQBroker.setPersistent(false);
98  		this.activeMQBroker.setUseJmx(false);
99  		this.activeMQBroker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 5);
100 		this.activeMQBroker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5);
101 		this.activeMQBroker.start();
102 	}
103 
104 	private void createAndStartRelay() throws InterruptedException {
105 		this.relay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
106 				this.responseChannel, new StubMessageChannel(), Arrays.asList("/queue/", "/topic/"));
107 		this.relay.setRelayPort(this.port);
108 		this.relay.setApplicationEventPublisher(this.eventPublisher);
109 		this.relay.setSystemHeartbeatReceiveInterval(0);
110 		this.relay.setSystemHeartbeatSendInterval(0);
111 
112 		this.relay.start();
113 		this.eventPublisher.expectBrokerAvailabilityEvent(true);
114 	}
115 
116 	@After
117 	public void tearDown() throws Exception {
118 		try {
119 			logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo());
120 			this.relay.stop();
121 		}
122 		finally {
123 			stopActiveMqBrokerAndAwait();
124 		}
125 	}
126 
127 	private void stopActiveMqBrokerAndAwait() throws Exception {
128 		logger.debug("Stopping ActiveMQ broker and will await shutdown");
129 		if (!this.activeMQBroker.isStarted()) {
130 			logger.debug("Broker not running");
131 			return;
132 		}
133 		final CountDownLatch latch = new CountDownLatch(1);
134 		this.activeMQBroker.addShutdownHook(new Runnable() {
135 			public void run() {
136 				latch.countDown();
137 			}
138 		});
139 		this.activeMQBroker.stop();
140 		assertTrue("Broker did not stop", latch.await(5, TimeUnit.SECONDS));
141 		logger.debug("Broker stopped");
142 	}
143 
144 	@Test
145 	public void publishSubscribe() throws Exception {
146 
147 		logger.debug("Starting test publishSubscribe()");
148 
149 		String sess1 = "sess1";
150 		String sess2 = "sess2";
151 		String subs1 = "subs1";
152 		String destination = "/topic/test";
153 
154 		MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
155 		MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build();
156 		this.relay.handleMessage(conn1.message);
157 		this.relay.handleMessage(conn2.message);
158 		this.responseHandler.expectMessages(conn1, conn2);
159 
160 		MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
161 		this.relay.handleMessage(subscribe.message);
162 		this.responseHandler.expectMessages(subscribe);
163 
164 		MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build();
165 		this.relay.handleMessage(send.message);
166 		this.responseHandler.expectMessages(send);
167 	}
168 
169 	@Test(expected=MessageDeliveryException.class)
170 	public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
171 
172 		logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
173 
174 		stopActiveMqBrokerAndAwait();
175 		this.eventPublisher.expectBrokerAvailabilityEvent(false);
176 
177 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
178 		this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
179 	}
180 
181 	@Test
182 	public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
183 
184 		logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()");
185 
186 		String sess1 = "sess1";
187 		MessageExchange connect = MessageExchangeBuilder.connect(sess1).build();
188 		this.relay.handleMessage(connect.message);
189 		this.responseHandler.expectMessages(connect);
190 
191 		MessageExchange error = MessageExchangeBuilder.error(sess1).build();
192 		stopActiveMqBrokerAndAwait();
193 		this.eventPublisher.expectBrokerAvailabilityEvent(false);
194 		this.responseHandler.expectMessages(error);
195 	}
196 
197 	@Test
198 	public void brokerAvailabilityEventWhenStopped() throws Exception {
199 
200 		logger.debug("Starting test brokerAvailabilityEventWhenStopped()");
201 
202 		stopActiveMqBrokerAndAwait();
203 		this.eventPublisher.expectBrokerAvailabilityEvent(false);
204 	}
205 
206 	@Test
207 	public void relayReconnectsIfBrokerComesBackUp() throws Exception {
208 
209 		logger.debug("Starting test relayReconnectsIfBrokerComesBackUp()");
210 
211 		String sess1 = "sess1";
212 		MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build();
213 		this.relay.handleMessage(conn1.message);
214 		this.responseHandler.expectMessages(conn1);
215 
216 		String subs1 = "subs1";
217 		String destination = "/topic/test";
218 		MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build();
219 		this.relay.handleMessage(subscribe.message);
220 		this.responseHandler.expectMessages(subscribe);
221 
222 		MessageExchange error = MessageExchangeBuilder.error(sess1).build();
223 		stopActiveMqBrokerAndAwait();
224 		this.responseHandler.expectMessages(error);
225 
226 		this.eventPublisher.expectBrokerAvailabilityEvent(false);
227 
228 		startActiveMqBroker();
229 		this.eventPublisher.expectBrokerAvailabilityEvent(true);
230 	}
231 
232 	@Test
233 	public void disconnectClosesRelaySessionCleanly() throws Exception {
234 
235 		logger.debug("Starting test disconnectClosesRelaySessionCleanly()");
236 
237 		MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
238 		this.relay.handleMessage(connect.message);
239 		this.responseHandler.expectMessages(connect);
240 
241 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
242 		headers.setSessionId("sess1");
243 		this.relay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
244 
245 		Thread.sleep(2000);
246 
247 		// Check that we have not received an ERROR as a result of the connection closing
248 		assertTrue("Unexpected messages: " + this.responseHandler.queue, this.responseHandler.queue.isEmpty());
249 	}
250 
251 	@Test
252 	public void disconnectWithReceipt() throws Exception {
253 
254 		logger.debug("Starting test disconnectWithReceipt()");
255 
256 		MessageExchange connect = MessageExchangeBuilder.connect("sess1").build();
257 		this.relay.handleMessage(connect.message);
258 		this.responseHandler.expectMessages(connect);
259 
260 		MessageExchange disconnect = MessageExchangeBuilder.disconnectWithReceipt("sess1", "r123").build();
261 		this.relay.handleMessage(disconnect.message);
262 
263 		this.responseHandler.expectMessages(disconnect);
264 	}
265 
266 
267 	private static class TestEventPublisher implements ApplicationEventPublisher {
268 
269 		private final BlockingQueue<BrokerAvailabilityEvent> eventQueue = new LinkedBlockingQueue<>();
270 
271 		@Override
272 		public void publishEvent(ApplicationEvent event) {
273 			logger.debug("Processing ApplicationEvent " + event);
274 			if (event instanceof BrokerAvailabilityEvent) {
275 				this.eventQueue.add((BrokerAvailabilityEvent) event);
276 			}
277 		}
278 
279 		public void expectBrokerAvailabilityEvent(boolean isBrokerAvailable) throws InterruptedException {
280 			BrokerAvailabilityEvent event = this.eventQueue.poll(20000, TimeUnit.MILLISECONDS);
281 			assertNotNull("Times out waiting for BrokerAvailabilityEvent[" + isBrokerAvailable + "]", event);
282 			assertEquals(isBrokerAvailable, event.isBrokerAvailable());
283 		}
284 	}
285 
286 	private static class TestMessageHandler implements MessageHandler {
287 
288 		private final BlockingQueue<Message<?>> queue = new LinkedBlockingQueue<>();
289 
290 		@Override
291 		public void handleMessage(Message<?> message) throws MessagingException {
292 			if (SimpMessageType.HEARTBEAT == SimpMessageHeaderAccessor.getMessageType(message.getHeaders())) {
293 				return;
294 			}
295 			this.queue.add(message);
296 		}
297 
298 		public void expectMessages(MessageExchange... messageExchanges) throws InterruptedException {
299 
300 			List<MessageExchange> expectedMessages =
301 					new ArrayList<MessageExchange>(Arrays.<MessageExchange>asList(messageExchanges));
302 
303 			while (expectedMessages.size() > 0) {
304 				Message<?> message = this.queue.poll(10000, TimeUnit.MILLISECONDS);
305 				assertNotNull("Timed out waiting for messages, expected [" + expectedMessages + "]", message);
306 
307 				MessageExchange match = findMatch(expectedMessages, message);
308 				assertNotNull("Unexpected message=" + message + ", expected [" + expectedMessages + "]", match);
309 
310 				expectedMessages.remove(match);
311 			}
312 		}
313 
314 		private MessageExchange findMatch(List<MessageExchange> expectedMessages, Message<?> message) {
315 			for (MessageExchange exchange : expectedMessages) {
316 				if (exchange.matchMessage(message)) {
317 					return exchange;
318 				}
319 			}
320 			return null;
321 		}
322 	}
323 
324 	/**
325 	 * Holds a message as well as expected and actual messages matched against expectations.
326 	 */
327 	private static class MessageExchange {
328 
329 		private final Message<?> message;
330 
331 		private final MessageMatcher[] expected;
332 
333 		private final Message<?>[] actual;
334 
335 		public MessageExchange(Message<?> message, MessageMatcher... expected) {
336 			this.message = message;
337 			this.expected = expected;
338 			this.actual = new Message<?>[expected.length];
339 		}
340 
341 		public boolean matchMessage(Message<?> message) {
342 			for (int i=0 ; i < this.expected.length; i++) {
343 				if (this.expected[i].match(message)) {
344 					this.actual[i] = message;
345 					return true;
346 				}
347 			}
348 			return false;
349 		}
350 
351 		@Override
352 		public String toString() {
353 			return "Forwarded message:\n" + this.message + "\n" +
354 					"Should receive back:\n" + Arrays.toString(this.expected) + "\n" +
355 					"Actually received:\n" + Arrays.toString(this.actual) + "\n";
356 		}
357 	}
358 
359 	private static class MessageExchangeBuilder {
360 
361 		private final Message<?> message;
362 
363 		private final StompHeaderAccessor headers;
364 
365 		private final List<MessageMatcher> expected = new ArrayList<>();
366 
367 
368 		private MessageExchangeBuilder(Message<?> message) {
369 			this.message = message;
370 			this.headers = StompHeaderAccessor.wrap(message);
371 		}
372 
373 		public static MessageExchangeBuilder error(String sessionId) {
374 			return new MessageExchangeBuilder(null).andExpectError(sessionId);
375 		}
376 
377 		public static MessageExchangeBuilder connect(String sessionId) {
378 			StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
379 			headers.setSessionId(sessionId);
380 			headers.setAcceptVersion("1.1,1.2");
381 			headers.setHeartbeat(0, 0);
382 			Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
383 
384 			MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
385 			builder.expected.add(new StompConnectedFrameMessageMatcher(sessionId));
386 			return builder;
387 		}
388 
389 		// TODO Determine why connectWithError() is unused.
390 		@SuppressWarnings("unused")
391 		public static MessageExchangeBuilder connectWithError(String sessionId) {
392 			StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
393 			headers.setSessionId(sessionId);
394 			headers.setAcceptVersion("1.1,1.2");
395 			Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
396 			MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
397 			return builder.andExpectError();
398 		}
399 
400 		public static MessageExchangeBuilder subscribeWithReceipt(String sessionId, String subscriptionId,
401 				String destination, String receiptId) {
402 
403 			StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
404 			headers.setSessionId(sessionId);
405 			headers.setSubscriptionId(subscriptionId);
406 			headers.setDestination(destination);
407 			headers.setReceipt(receiptId);
408 			Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
409 
410 			MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
411 			builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId));
412 			return builder;
413 		}
414 
415 		public static MessageExchangeBuilder send(String destination, String payload) {
416 			SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
417 			headers.setDestination(destination);
418 			Message<?> message = MessageBuilder.createMessage(payload.getBytes(UTF_8), headers.getMessageHeaders());
419 			return new MessageExchangeBuilder(message);
420 		}
421 
422 		public static MessageExchangeBuilder disconnectWithReceipt(String sessionId, String receiptId) {
423 
424 			StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
425 			headers.setSessionId(sessionId);
426 			headers.setReceipt(receiptId);
427 			Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
428 
429 			MessageExchangeBuilder builder = new MessageExchangeBuilder(message);
430 			builder.expected.add(new StompReceiptFrameMessageMatcher(sessionId, receiptId));
431 			return builder;
432 		}
433 
434 		public MessageExchangeBuilder andExpectMessage(String sessionId, String subscriptionId) {
435 			Assert.isTrue(SimpMessageType.MESSAGE.equals(headers.getMessageType()));
436 			String destination = this.headers.getDestination();
437 			Object payload = this.message.getPayload();
438 			this.expected.add(new StompMessageFrameMessageMatcher(sessionId, subscriptionId, destination, payload));
439 			return this;
440 		}
441 
442 		public MessageExchangeBuilder andExpectError() {
443 			String sessionId = this.headers.getSessionId();
444 			Assert.notNull(sessionId, "No sessionId to match the ERROR frame to");
445 			return andExpectError(sessionId);
446 		}
447 
448 		public MessageExchangeBuilder andExpectError(String sessionId) {
449 			this.expected.add(new StompFrameMessageMatcher(StompCommand.ERROR, sessionId));
450 			return this;
451 		}
452 
453 		public MessageExchange build() {
454 			return new MessageExchange(this.message, this.expected.toArray(new MessageMatcher[this.expected.size()]));
455 		}
456 	}
457 
458 	private static interface MessageMatcher {
459 
460 		boolean match(Message<?> message);
461 
462 	}
463 
464 	private static class StompFrameMessageMatcher implements MessageMatcher {
465 
466 		private final StompCommand command;
467 
468 		private final String sessionId;
469 
470 
471 		public StompFrameMessageMatcher(StompCommand command, String sessionId) {
472 			this.command = command;
473 			this.sessionId = sessionId;
474 		}
475 
476 
477 		@Override
478 		public final boolean match(Message<?> message) {
479 			StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
480 			if (!this.command.equals(headers.getCommand()) || (this.sessionId != headers.getSessionId())) {
481 				return false;
482 			}
483 			return matchInternal(headers, message.getPayload());
484 		}
485 
486 		protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
487 			return true;
488 		}
489 
490 		@Override
491 		public String toString() {
492 			return "command=" + this.command  + ", session=\"" + this.sessionId + "\"";
493 		}
494 	}
495 
496 	private static class StompReceiptFrameMessageMatcher extends StompFrameMessageMatcher {
497 
498 		private final String receiptId;
499 
500 		public StompReceiptFrameMessageMatcher(String sessionId, String receipt) {
501 			super(StompCommand.RECEIPT, sessionId);
502 			this.receiptId = receipt;
503 		}
504 
505 		@Override
506 		protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
507 			return (this.receiptId.equals(headers.getReceiptId()));
508 		}
509 
510 		@Override
511 		public String toString() {
512 			return super.toString() + ", receiptId=\"" + this.receiptId + "\"";
513 		}
514 	}
515 
516 	private static class StompMessageFrameMessageMatcher extends StompFrameMessageMatcher {
517 
518 		private final String subscriptionId;
519 
520 		private final String destination;
521 
522 		private final Object payload;
523 
524 
525 		public StompMessageFrameMessageMatcher(String sessionId, String subscriptionId, String destination, Object payload) {
526 			super(StompCommand.MESSAGE, sessionId);
527 			this.subscriptionId = subscriptionId;
528 			this.destination = destination;
529 			this.payload = payload;
530 		}
531 
532 		@Override
533 		protected boolean matchInternal(StompHeaderAccessor headers, Object payload) {
534 			if (!this.subscriptionId.equals(headers.getSubscriptionId()) ||  !this.destination.equals(headers.getDestination())) {
535 				return false;
536 			}
537 			if (payload instanceof byte[] && this.payload instanceof byte[]) {
538 				return Arrays.equals((byte[]) payload, (byte[]) this.payload);
539 			}
540 			else {
541 				return this.payload.equals(payload);
542 			}
543 		}
544 
545 		@Override
546 		public String toString() {
547 			return super.toString() + ", subscriptionId=\"" + this.subscriptionId
548 					+ "\", destination=\"" + this.destination + "\", payload=\"" + getPayloadAsText() + "\"";
549 		}
550 
551 		protected String getPayloadAsText() {
552 			return (this.payload instanceof byte[])
553 					? new String((byte[]) this.payload, UTF_8) : payload.toString();
554 		}
555 	}
556 
557 	private static class StompConnectedFrameMessageMatcher extends StompFrameMessageMatcher {
558 
559 
560 		public StompConnectedFrameMessageMatcher(String sessionId) {
561 			super(StompCommand.CONNECTED, sessionId);
562 		}
563 
564 	}
565 
566 }