1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.messaging.simp.stomp;
18
19 import java.io.UnsupportedEncodingException;
20 import java.util.ArrayList;
21 import java.util.List;
22
23 import org.junit.Test;
24 import reactor.function.Consumer;
25 import reactor.function.Function;
26 import reactor.io.Buffer;
27
28 import org.springframework.messaging.Message;
29 import org.springframework.messaging.simp.SimpMessageType;
30 import org.springframework.messaging.support.MessageBuilder;
31 import org.springframework.util.InvalidMimeTypeException;
32
33 import static org.junit.Assert.*;
34
35
36
37
38
39
40 public class StompCodecTests {
41
42 private final ArgumentCapturingConsumer<Message<byte[]>> consumer = new ArgumentCapturingConsumer<Message<byte[]>>();
43
44 private final Function<Buffer, Message<byte[]>> decoder = new Reactor11StompCodec().decoder(consumer);
45
46 @Test
47 public void decodeFrameWithCrLfEols() {
48 Message<byte[]> frame = decode("DISCONNECT\r\n\r\n\0");
49 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
50
51 assertEquals(StompCommand.DISCONNECT, headers.getCommand());
52 assertEquals(0, headers.toNativeHeaderMap().size());
53 assertEquals(0, frame.getPayload().length);
54 }
55
56 @Test
57 public void decodeFrameWithNoHeadersAndNoBody() {
58 Message<byte[]> frame = decode("DISCONNECT\n\n\0");
59 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
60
61 assertEquals(StompCommand.DISCONNECT, headers.getCommand());
62 assertEquals(0, headers.toNativeHeaderMap().size());
63 assertEquals(0, frame.getPayload().length);
64 }
65
66 @Test
67 public void decodeFrameWithNoBody() {
68 String accept = "accept-version:1.1\n";
69 String host = "host:github.org\n";
70
71 Message<byte[]> frame = decode("CONNECT\n" + accept + host + "\n\0");
72 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
73
74 assertEquals(StompCommand.CONNECT, headers.getCommand());
75
76 assertEquals(2, headers.toNativeHeaderMap().size());
77 assertEquals("1.1", headers.getFirstNativeHeader("accept-version"));
78 assertEquals("github.org", headers.getHost());
79
80 assertEquals(0, frame.getPayload().length);
81 }
82
83 @Test
84 public void decodeFrame() throws UnsupportedEncodingException {
85 Message<byte[]> frame = decode("SEND\ndestination:test\n\nThe body of the message\0");
86 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
87
88 assertEquals(StompCommand.SEND, headers.getCommand());
89
90 assertEquals(headers.toNativeHeaderMap().toString(), 1, headers.toNativeHeaderMap().size());
91 assertEquals("test", headers.getDestination());
92
93 String bodyText = new String(frame.getPayload());
94 assertEquals("The body of the message", bodyText);
95 }
96
97 @Test
98 public void decodeFrameWithContentLength() {
99 Message<byte[]> message = decode("SEND\ncontent-length:23\n\nThe body of the message\0");
100 StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
101
102 assertEquals(StompCommand.SEND, headers.getCommand());
103
104 assertEquals(1, headers.toNativeHeaderMap().size());
105 assertEquals(Integer.valueOf(23), headers.getContentLength());
106
107 String bodyText = new String(message.getPayload());
108 assertEquals("The body of the message", bodyText);
109 }
110
111
112
113 @Test
114 public void decodeFrameWithInvalidContentLength() {
115 Message<byte[]> message = decode("SEND\ncontent-length:-1\n\nThe body of the message\0");
116 StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
117
118 assertEquals(StompCommand.SEND, headers.getCommand());
119
120 assertEquals(1, headers.toNativeHeaderMap().size());
121 assertEquals(Integer.valueOf(-1), headers.getContentLength());
122
123 String bodyText = new String(message.getPayload());
124 assertEquals("The body of the message", bodyText);
125 }
126
127 @Test
128 public void decodeFrameWithContentLengthZero() {
129 Message<byte[]> frame = decode("SEND\ncontent-length:0\n\n\0");
130 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
131
132 assertEquals(StompCommand.SEND, headers.getCommand());
133
134 assertEquals(1, headers.toNativeHeaderMap().size());
135 assertEquals(Integer.valueOf(0), headers.getContentLength());
136
137 String bodyText = new String(frame.getPayload());
138 assertEquals("", bodyText);
139 }
140
141 @Test
142 public void decodeFrameWithNullOctectsInTheBody() {
143 Message<byte[]> frame = decode("SEND\ncontent-length:23\n\nThe b\0dy \0f the message\0");
144 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
145
146 assertEquals(StompCommand.SEND, headers.getCommand());
147
148 assertEquals(1, headers.toNativeHeaderMap().size());
149 assertEquals(Integer.valueOf(23), headers.getContentLength());
150
151 String bodyText = new String(frame.getPayload());
152 assertEquals("The b\0dy \0f the message", bodyText);
153 }
154
155 @Test
156 public void decodeFrameWithEscapedHeaders() {
157 Message<byte[]> frame = decode("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0");
158 StompHeaderAccessor headers = StompHeaderAccessor.wrap(frame);
159
160 assertEquals(StompCommand.DISCONNECT, headers.getCommand());
161
162 assertEquals(1, headers.toNativeHeaderMap().size());
163 assertEquals("alpha:bravo\r\n\\", headers.getFirstNativeHeader("a:\r\n\\b"));
164 }
165
166 @Test(expected=StompConversionException.class)
167 public void decodeFrameBodyNotAllowed() {
168 decode("CONNECT\naccept-version:1.2\n\nThe body of the message\0");
169 }
170
171 @Test
172 public void decodeMultipleFramesFromSameBuffer() {
173 String frame1 = "SEND\ndestination:test\n\nThe body of the message\0";
174 String frame2 = "DISCONNECT\n\n\0";
175
176 Buffer buffer = Buffer.wrap(frame1 + frame2);
177
178 final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
179 new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
180 @Override
181 public void accept(Message<byte[]> message) {
182 messages.add(message);
183 }
184 }).apply(buffer);
185
186 assertEquals(2, messages.size());
187 assertEquals(StompCommand.SEND, StompHeaderAccessor.wrap(messages.get(0)).getCommand());
188 assertEquals(StompCommand.DISCONNECT, StompHeaderAccessor.wrap(messages.get(1)).getCommand());
189 }
190
191 @Test
192 public void decodeFrameWithIncompleteCommand() {
193 assertIncompleteDecode("MESSAG");
194 }
195
196 @Test
197 public void decodeFrameWithIncompleteHeader() {
198 assertIncompleteDecode("SEND\ndestination");
199 assertIncompleteDecode("SEND\ndestination:");
200 assertIncompleteDecode("SEND\ndestination:test");
201 }
202
203 @Test
204 public void decodeFrameWithoutNullOctetTerminator() {
205 assertIncompleteDecode("SEND\ndestination:test\n");
206 assertIncompleteDecode("SEND\ndestination:test\n\n");
207 assertIncompleteDecode("SEND\ndestination:test\n\nThe body");
208 }
209
210 @Test
211 public void decodeFrameWithInsufficientContent() {
212 assertIncompleteDecode("SEND\ncontent-length:23\n\nThe body of the mess");
213 }
214
215 @Test
216 public void decodeFrameWithIncompleteContentType() {
217 assertIncompleteDecode("SEND\ncontent-type:text/plain;charset=U");
218 }
219
220 @Test(expected = InvalidMimeTypeException.class)
221 public void decodeFrameWithInvalidContentType() {
222 assertIncompleteDecode("SEND\ncontent-type:text/plain;charset=U\n\nThe body\0");
223 }
224
225 @Test(expected=StompConversionException.class)
226 public void decodeFrameWithIncorrectTerminator() {
227 decode("SEND\ncontent-length:23\n\nThe body of the message*");
228 }
229
230 @Test
231 public void decodeHeartbeat() {
232 String frame = "\n";
233
234 Buffer buffer = Buffer.wrap(frame);
235
236 final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
237 new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
238 @Override
239 public void accept(Message<byte[]> message) {
240 messages.add(message);
241 }
242 }).apply(buffer);
243
244 assertEquals(1, messages.size());
245 assertEquals(SimpMessageType.HEARTBEAT, StompHeaderAccessor.wrap(messages.get(0)).getMessageType());
246 }
247
248 @Test
249 public void encodeFrameWithNoHeadersAndNoBody() {
250 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
251
252 Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
253
254 assertEquals("DISCONNECT\n\n\0", new Reactor11StompCodec().encoder().apply(frame).asString());
255 }
256
257 @Test
258 public void encodeFrameWithHeaders() {
259 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
260 headers.setAcceptVersion("1.2");
261 headers.setHost("github.org");
262
263 Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
264
265 String frameString = new Reactor11StompCodec().encoder().apply(frame).asString();
266
267 assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") ||
268 frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"));
269 }
270
271 @Test
272 public void encodeFrameWithHeadersThatShouldBeEscaped() {
273 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT);
274 headers.addNativeHeader("a:\r\n\\b", "alpha:bravo\r\n\\");
275
276 Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
277
278 assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
279 new Reactor11StompCodec().encoder().apply(frame).asString());
280 }
281
282 @Test
283 public void encodeFrameWithHeadersBody() {
284 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
285 headers.addNativeHeader("a", "alpha");
286
287 Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
288
289 assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
290 new Reactor11StompCodec().encoder().apply(frame).asString());
291 }
292
293 @Test
294 public void encodeFrameWithContentLengthPresent() {
295 StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
296 headers.setContentLength(12);
297
298 Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
299
300 assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
301 new Reactor11StompCodec().encoder().apply(frame).asString());
302 }
303
304 private void assertIncompleteDecode(String partialFrame) {
305 Buffer buffer = Buffer.wrap(partialFrame);
306 assertNull(decode(buffer));
307 assertEquals(0, buffer.position());
308 }
309
310 private Message<byte[]> decode(String stompFrame) {
311 Buffer buffer = Buffer.wrap(stompFrame);
312 return decode(buffer);
313 }
314
315 private Message<byte[]> decode(Buffer buffer) {
316 this.decoder.apply(buffer);
317 if (consumer.arguments.isEmpty()) {
318 return null;
319 } else {
320 return consumer.arguments.get(0);
321 }
322 }
323
324
325
326 private static final class ArgumentCapturingConsumer<T> implements Consumer<T> {
327
328 private final List<T> arguments = new ArrayList<T>();
329
330 @Override
331 public void accept(T t) {
332 arguments.add(t);
333 }
334
335 }
336 }