View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.messaging.simp.stomp;
18  
19  import java.io.UnsupportedEncodingException;
20  import java.util.ArrayList;
21  import java.util.List;
22  
23  import org.junit.Test;
24  import reactor.function.Consumer;
25  import reactor.function.Function;
26  import reactor.io.Buffer;
27  
28  import org.springframework.messaging.Message;
29  import org.springframework.messaging.simp.SimpMessageType;
30  import org.springframework.messaging.support.MessageBuilder;
31  import org.springframework.util.InvalidMimeTypeException;
32  
33  import static org.junit.Assert.*;
34  
35  /**
36   * Test fixture for {@link Reactor11StompCodec}.
37   *
38   * @author Andy Wilkinson
39   */
40  public class StompCodecTests {
41  
42  	private final ArgumentCapturingConsumer<Message<byte[]>> consumer = new ArgumentCapturingConsumer<Message<byte[]>>();
43  
44  	private final Function<Buffer, Message<byte[]>> decoder = new Reactor11StompCodec().decoder(consumer);
45  
46  	@Test
47  	public void decodeFrameWithCrLfEols() {
48  		Message<byte[]> frame = decode("DISCONNECT\r\n\r\n\0");
49  		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
50  
51  		assertEquals(StompCommand.DISCONNECT, headers.getCommand());
52  		assertEquals(0, headers.toNativeHeaderMap().size());
53  		assertEquals(0, frame.getPayload().length);
54  	}
55  
56  	@Test
57  	public void decodeFrameWithNoHeadersAndNoBody() {
58  		Message<byte[]> frame = decode("DISCONNECT\n\n\0");
59  		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
60  
61  		assertEquals(StompCommand.DISCONNECT, headers.getCommand());
62  		assertEquals(0, headers.toNativeHeaderMap().size());
63  		assertEquals(0, frame.getPayload().length);
64  	}
65  
66  	@Test
67  	public void decodeFrameWithNoBody() {
68  		String accept = "accept-version:1.1\n";
69  		String host = "host:github.org\n";
70  
71  		Message<byte[]> frame = decode("CONNECT\n" + accept + host + "\n\0");
72  		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
73  
74  		assertEquals(StompCommand.CONNECT, headers.getCommand());
75  
76  		assertEquals(2, headers.toNativeHeaderMap().size());
77  		assertEquals("1.1", headers.getFirstNativeHeader("accept-version"));
78  		assertEquals("github.org", headers.getHost());
79  
80  		assertEquals(0, frame.getPayload().length);
81  	}
82  
83  	@Test
84  	public void decodeFrame() throws UnsupportedEncodingException {
85  		Message<byte[]> frame = decode("SEND\ndestination:test\n\nThe body of the message\0");
86  		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
87  
88  		assertEquals(StompCommand.SEND, headers.getCommand());
89  
90  		assertEquals(headers.toNativeHeaderMap().toString(), 1, headers.toNativeHeaderMap().size());
91  		assertEquals("test", headers.getDestination());
92  
93  		String bodyText = new String(frame.getPayload());
94  		assertEquals("The body of the message", bodyText);
95  	}
96  
97  	@Test
98  	public void decodeFrameWithContentLength() {
99  		Message<byte[]> message = decode("SEND\ncontent-length:23\n\nThe body of the message\0");
100 		StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
101 
102 		assertEquals(StompCommand.SEND, headers.getCommand());
103 
104 		assertEquals(1, headers.toNativeHeaderMap().size());
105 		assertEquals(Integer.valueOf(23), headers.getContentLength());
106 
107 		String bodyText = new String(message.getPayload());
108 		assertEquals("The body of the message", bodyText);
109 	}
110 
111 	// SPR-11528
112 
113 	@Test
114 	public void decodeFrameWithInvalidContentLength() {
115 		Message<byte[]> message = decode("SEND\ncontent-length:-1\n\nThe body of the message\0");
116 		StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
117 
118 		assertEquals(StompCommand.SEND, headers.getCommand());
119 
120 		assertEquals(1, headers.toNativeHeaderMap().size());
121 		assertEquals(Integer.valueOf(-1), headers.getContentLength());
122 
123 		String bodyText = new String(message.getPayload());
124 		assertEquals("The body of the message", bodyText);
125 	}
126 
127 	@Test
128 	public void decodeFrameWithContentLengthZero() {
129 		Message<byte[]> frame = decode("SEND\ncontent-length:0\n\n\0");
130 		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
131 
132 		assertEquals(StompCommand.SEND, headers.getCommand());
133 
134 		assertEquals(1, headers.toNativeHeaderMap().size());
135 		assertEquals(Integer.valueOf(0), headers.getContentLength());
136 
137 		String bodyText = new String(frame.getPayload());
138 		assertEquals("", bodyText);
139 	}
140 
141 	@Test
142 	public void decodeFrameWithNullOctectsInTheBody() {
143 		Message<byte[]> frame = decode("SEND\ncontent-length:23\n\nThe b\0dy \0f the message\0");
144 		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
145 
146 		assertEquals(StompCommand.SEND, headers.getCommand());
147 
148 		assertEquals(1, headers.toNativeHeaderMap().size());
149 		assertEquals(Integer.valueOf(23), headers.getContentLength());
150 
151 		String bodyText = new String(frame.getPayload());
152 		assertEquals("The b\0dy \0f the message", bodyText);
153 	}
154 
155 	@Test
156 	public void decodeFrameWithEscapedHeaders() {
157 		Message<byte[]> frame = decode("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
158 		StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
159 
160 		assertEquals(StompCommand.DISCONNECT, headers.getCommand());
161 
162 		assertEquals(1, headers.toNativeHeaderMap().size());
163 		assertEquals("alpha:bravo\r\n\\", headers.getFirstNativeHeader("a:\r\n\\b"));
164 	}
165 
166 	@Test(expected=StompConversionException.class)
167 	public void decodeFrameBodyNotAllowed() {
168 		decode("CONNECT\naccept-version:1.2\n\nThe body of the message\0");
169 	}
170 
171 	@Test
172 	public void decodeMultipleFramesFromSameBuffer() {
173 		String frame1 = "SEND\ndestination:test\n\nThe body of the message\0";
174 		String frame2 = "DISCONNECT\n\n\0";
175 
176 		Buffer buffer = Buffer.wrap(frame1 + frame2);
177 
178 		final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
179 		new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
180 			@Override
181 			public void accept(Message<byte[]> message) {
182 				messages.add(message);
183 			}
184 		}).apply(buffer);
185 
186 		assertEquals(2, messages.size());
187 		assertEquals(StompCommand.SEND, StompHeaderAccessor.wrap(messages.get(0)).getCommand());
188 		assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(messages.get(1)).getCommand());
189 	}
190 
191 	@Test
192 	public void decodeFrameWithIncompleteCommand() {
193 		assertIncompleteDecode("MESSAG");
194 	}
195 
196 	@Test
197 	public void decodeFrameWithIncompleteHeader() {
198 		assertIncompleteDecode("SEND\ndestination");
199 		assertIncompleteDecode("SEND\ndestination:");
200 		assertIncompleteDecode("SEND\ndestination:test");
201 	}
202 
203 	@Test
204 	public void decodeFrameWithoutNullOctetTerminator() {
205 		assertIncompleteDecode("SEND\ndestination:test\n");
206 		assertIncompleteDecode("SEND\ndestination:test\n\n");
207 		assertIncompleteDecode("SEND\ndestination:test\n\nThe body");
208 	}
209 
210 	@Test
211 	public void decodeFrameWithInsufficientContent() {
212 		assertIncompleteDecode("SEND\ncontent-length:23\n\nThe body of the mess");
213 	}
214 
215 	@Test
216 	public void decodeFrameWithIncompleteContentType() {
217 		assertIncompleteDecode("SEND\ncontent-type:text/plain;charset=U");
218 	}
219 
220 	@Test(expected = InvalidMimeTypeException.class)
221 	public void decodeFrameWithInvalidContentType() {
222 		assertIncompleteDecode("SEND\ncontent-type:text/plain;charset=U\n\nThe body\0");
223 	}
224 
225 	@Test(expected=StompConversionException.class)
226 	public void decodeFrameWithIncorrectTerminator() {
227 		decode("SEND\ncontent-length:23\n\nThe body of the message*");
228 	}
229 
230 	@Test
231 	public void decodeHeartbeat() {
232 		String frame = "\n";
233 
234 		Buffer buffer = Buffer.wrap(frame);
235 
236 		final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
237 		new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
238 			@Override
239 			public void accept(Message<byte[]> message) {
240 				messages.add(message);
241 			}
242 		}).apply(buffer);
243 
244 		assertEquals(1, messages.size());
245 		assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
246 	}
247 
248 	@Test
249 	public void encodeFrameWithNoHeadersAndNoBody() {
250 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
251 
252 		Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
253 
254 		assertEquals("DISCONNECT\n\n\0", new Reactor11StompCodec().encoder().apply(frame).asString());
255 	}
256 
257 	@Test
258 	public void encodeFrameWithHeaders() {
259 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
260 		headers.setAcceptVersion("1.2");
261 		headers.setHost("github.org");
262 
263 		Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
264 
265 		String frameString = new Reactor11StompCodec().encoder().apply(frame).asString();
266 
267 		assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") ||
268 				frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"));
269 	}
270 
271 	@Test
272 	public void encodeFrameWithHeadersThatShouldBeEscaped() {
273 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
274 		headers.addNativeHeader("a:\r\n\\b",  "alpha:bravo\r\n\\");
275 
276 		Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
277 
278 		assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
279 				new Reactor11StompCodec().encoder().apply(frame).asString());
280 	}
281 
282 	@Test
283 	public void encodeFrameWithHeadersBody() {
284 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
285 		headers.addNativeHeader("a", "alpha");
286 
287 		Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
288 
289 		assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
290 				new Reactor11StompCodec().encoder().apply(frame).asString());
291 	}
292 
293 	@Test
294 	public void encodeFrameWithContentLengthPresent() {
295 		StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
296 		headers.setContentLength(12);
297 
298 		Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
299 
300 		assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
301 				new Reactor11StompCodec().encoder().apply(frame).asString());
302 	}
303 
304 	private void assertIncompleteDecode(String partialFrame) {
305 		Buffer buffer = Buffer.wrap(partialFrame);
306 		assertNull(decode(buffer));
307 		assertEquals(0, buffer.position());
308 	}
309 
310 	private Message<byte[]> decode(String stompFrame) {
311 		Buffer buffer = Buffer.wrap(stompFrame);
312 		return decode(buffer);
313 	}
314 
315 	private Message<byte[]> decode(Buffer buffer) {
316 		this.decoder.apply(buffer);
317 		if (consumer.arguments.isEmpty()) {
318 			return null;
319 		} else {
320 			return consumer.arguments.get(0);
321 		}
322 	}
323 
324 
325 
326 	private static final class ArgumentCapturingConsumer<T> implements Consumer<T> {
327 
328 		private final List<T> arguments = new ArrayList<T>();
329 
330 		@Override
331 		public void accept(T t) {
332 			arguments.add(t);
333 		}
334 
335 	}
336 }