1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.messaging.simp.stomp;
18
19 import java.nio.charset.Charset;
20 import java.util.Arrays;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.springframework.messaging.Message;
28 import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
29 import org.springframework.messaging.simp.SimpMessageType;
30 import org.springframework.messaging.support.MessageHeaderAccessor;
31 import org.springframework.util.Assert;
32 import org.springframework.util.MimeType;
33 import org.springframework.util.MimeTypeUtils;
34 import org.springframework.util.StringUtils;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
61
62 private static final AtomicLong messageIdCounter = new AtomicLong();
63
64 private static final long[] DEFAULT_HEARTBEAT = new long[] {0, 0};
65
66
67
68
69 public static final String STOMP_ID_HEADER = "id";
70
71 public static final String STOMP_HOST_HEADER = "host";
72
73 public static final String STOMP_ACCEPT_VERSION_HEADER = "accept-version";
74
75 public static final String STOMP_MESSAGE_ID_HEADER = "message-id";
76
77 public static final String STOMP_RECEIPT_HEADER = "receipt";
78
79 public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id";
80
81 public static final String STOMP_SUBSCRIPTION_HEADER = "subscription";
82
83 public static final String STOMP_VERSION_HEADER = "version";
84
85 public static final String STOMP_MESSAGE_HEADER = "message";
86
87 public static final String STOMP_ACK_HEADER = "ack";
88
89 public static final String STOMP_NACK_HEADER = "nack";
90
91 public static final String STOMP_LOGIN_HEADER = "login";
92
93 public static final String STOMP_PASSCODE_HEADER = "passcode";
94
95 public static final String STOMP_DESTINATION_HEADER = "destination";
96
97 public static final String STOMP_CONTENT_TYPE_HEADER = "content-type";
98
99 public static final String STOMP_CONTENT_LENGTH_HEADER = "content-length";
100
101 public static final String STOMP_HEARTBEAT_HEADER = "heart-beat";
102
103
104
105 private static final String COMMAND_HEADER = "stompCommand";
106
107 private static final String CREDENTIALS_HEADER = "stompCredentials";
108
109
110
111
112
113 StompHeaderAccessor(StompCommand command, Map<String, List<String>> externalSourceHeaders) {
114 super(command.getMessageType(), externalSourceHeaders);
115 setHeader(COMMAND_HEADER, command);
116 updateSimpMessageHeadersFromStompHeaders();
117 }
118
119
120
121
122
123
124
125 StompHeaderAccessor(Message<?> message) {
126 super(message);
127 updateStompHeadersFromSimpMessageHeaders();
128 }
129
130 StompHeaderAccessor() {
131 super(SimpMessageType.HEARTBEAT, null);
132 }
133
134
135 void updateSimpMessageHeadersFromStompHeaders() {
136 if (getNativeHeaders() == null) {
137 return;
138 }
139 String value = getFirstNativeHeader(STOMP_DESTINATION_HEADER);
140 if (value != null) {
141 super.setDestination(value);
142 }
143 value = getFirstNativeHeader(STOMP_CONTENT_TYPE_HEADER);
144 if (value != null) {
145 super.setContentType(MimeTypeUtils.parseMimeType(value));
146 }
147 StompCommand command = getCommand();
148 if (StompCommand.MESSAGE.equals(command)) {
149 value = getFirstNativeHeader(STOMP_SUBSCRIPTION_HEADER);
150 if (value != null) {
151 super.setSubscriptionId(value);
152 }
153 }
154 else if (StompCommand.SUBSCRIBE.equals(command) || StompCommand.UNSUBSCRIBE.equals(command)) {
155 value = getFirstNativeHeader(STOMP_ID_HEADER);
156 if (value != null) {
157 super.setSubscriptionId(value);
158 }
159 }
160 else if (StompCommand.CONNECT.equals(command)) {
161 protectPasscode();
162 }
163 }
164
165 private void updateStompHeadersFromSimpMessageHeaders() {
166 if (getDestination() != null) {
167 setNativeHeader(STOMP_DESTINATION_HEADER, getDestination());
168 }
169 if (getContentType() != null) {
170 setNativeHeader(STOMP_CONTENT_TYPE_HEADER, getContentType().toString());
171 }
172 trySetStompHeaderForSubscriptionId();
173 }
174
175
176 @Override
177 protected MessageHeaderAccessor createAccessor(Message<?> message) {
178 return wrap(message);
179 }
180
181 Map<String, List<String>> getNativeHeaders() {
182 @SuppressWarnings("unchecked")
183 Map<String, List<String>> map = (Map<String, List<String>>) getHeader(NATIVE_HEADERS);
184 return (map != null ? map : Collections.<String, List<String>>emptyMap());
185 }
186
187 public StompCommand updateStompCommandAsClientMessage() {
188 Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()), "Unexpected message type " + getMessage());
189 if (getCommand() == null) {
190 setHeader(COMMAND_HEADER, StompCommand.SEND);
191 }
192 else if (!getCommand().equals(StompCommand.SEND)) {
193 throw new IllegalStateException("Unexpected STOMP command " + getCommand());
194 }
195 return getCommand();
196 }
197
198 public void updateStompCommandAsServerMessage() {
199 Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()), "Unexpected message type " + getMessage());
200 StompCommand command = getCommand();
201 if ((command == null) || StompCommand.SEND.equals(command)) {
202 setHeader(COMMAND_HEADER, StompCommand.MESSAGE);
203 }
204 else if (!StompCommand.MESSAGE.equals(command)) {
205 throw new IllegalStateException("Unexpected STOMP command " + command);
206 }
207 trySetStompHeaderForSubscriptionId();
208 if (getMessageId() == null) {
209 String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement();
210 setNativeHeader(STOMP_MESSAGE_ID_HEADER, messageId);
211 }
212 }
213
214
215
216
217 public StompCommand getCommand() {
218 return (StompCommand) getHeader(COMMAND_HEADER);
219 }
220
221 public Set<String> getAcceptVersion() {
222 String rawValue = getFirstNativeHeader(STOMP_ACCEPT_VERSION_HEADER);
223 return (rawValue != null ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.<String>emptySet());
224 }
225
226 public boolean isHeartbeat() {
227 return (SimpMessageType.HEARTBEAT == getMessageType());
228 }
229
230 public void setAcceptVersion(String acceptVersion) {
231 setNativeHeader(STOMP_ACCEPT_VERSION_HEADER, acceptVersion);
232 }
233
234 public void setHost(String host) {
235 setNativeHeader(STOMP_HOST_HEADER, host);
236 }
237
238 public String getHost() {
239 return getFirstNativeHeader(STOMP_HOST_HEADER);
240 }
241
242 @Override
243 public void setDestination(String destination) {
244 super.setDestination(destination);
245 setNativeHeader(STOMP_DESTINATION_HEADER, destination);
246 }
247
248 public long[] getHeartbeat() {
249 String rawValue = getFirstNativeHeader(STOMP_HEARTBEAT_HEADER);
250 if (!StringUtils.hasText(rawValue)) {
251 return Arrays.copyOf(DEFAULT_HEARTBEAT, 2);
252 }
253 String[] rawValues = StringUtils.commaDelimitedListToStringArray(rawValue);
254 return new long[] { Long.valueOf(rawValues[0]), Long.valueOf(rawValues[1])};
255 }
256
257 public void setContentType(MimeType contentType) {
258 super.setContentType(contentType);
259 setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString());
260 }
261
262 @Override
263 public void setSubscriptionId(String subscriptionId) {
264 super.setSubscriptionId(subscriptionId);
265 trySetStompHeaderForSubscriptionId();
266 }
267
268 private void trySetStompHeaderForSubscriptionId() {
269 String subscriptionId = getSubscriptionId();
270 if (subscriptionId != null) {
271 if (getCommand() != null && StompCommand.MESSAGE.equals(getCommand())) {
272 setNativeHeader(STOMP_SUBSCRIPTION_HEADER, subscriptionId);
273 }
274 else {
275 SimpMessageType messageType = getMessageType();
276 if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
277 setNativeHeader(STOMP_ID_HEADER, subscriptionId);
278 }
279 }
280 }
281 }
282
283 public Integer getContentLength() {
284 if (containsNativeHeader(STOMP_CONTENT_LENGTH_HEADER)) {
285 return Integer.valueOf(getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER));
286 }
287 return null;
288 }
289
290 public void setContentLength(int contentLength) {
291 setNativeHeader(STOMP_CONTENT_LENGTH_HEADER, String.valueOf(contentLength));
292 }
293
294 public void setHeartbeat(long cx, long cy) {
295 setNativeHeader(STOMP_HEARTBEAT_HEADER, StringUtils.arrayToCommaDelimitedString(new Object[]{cx, cy}));
296 }
297
298 public void setAck(String ack) {
299 setNativeHeader(STOMP_ACK_HEADER, ack);
300 }
301
302 public String getAck() {
303 return getFirstNativeHeader(STOMP_ACK_HEADER);
304 }
305
306 public void setNack(String nack) {
307 setNativeHeader(STOMP_NACK_HEADER, nack);
308 }
309
310 public String getNack() {
311 return getFirstNativeHeader(STOMP_NACK_HEADER);
312 }
313
314 public void setLogin(String login) {
315 setNativeHeader(STOMP_LOGIN_HEADER, login);
316 }
317
318 public String getLogin() {
319 return getFirstNativeHeader(STOMP_LOGIN_HEADER);
320 }
321
322 public void setPasscode(String passcode) {
323 setNativeHeader(STOMP_PASSCODE_HEADER, passcode);
324 protectPasscode();
325 }
326
327 private void protectPasscode() {
328 String value = getFirstNativeHeader(STOMP_PASSCODE_HEADER);
329 if (value != null && !"PROTECTED".equals(value)) {
330 setHeader(CREDENTIALS_HEADER, new StompPasscode(value));
331 setNativeHeader(STOMP_PASSCODE_HEADER, "PROTECTED");
332 }
333 }
334
335
336
337
338 public String getPasscode() {
339 StompPasscode credentials = (StompPasscode) getHeader(CREDENTIALS_HEADER);
340 return (credentials != null ? credentials.passcode : null);
341 }
342
343 public void setReceiptId(String receiptId) {
344 setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId);
345 }
346
347 public String getReceiptId() {
348 return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER);
349 }
350
351 public void setReceipt(String receiptId) {
352 setNativeHeader(STOMP_RECEIPT_HEADER, receiptId);
353 }
354
355 public String getReceipt() {
356 return getFirstNativeHeader(STOMP_RECEIPT_HEADER);
357 }
358
359 public String getMessage() {
360 return getFirstNativeHeader(STOMP_MESSAGE_HEADER);
361 }
362
363 public void setMessage(String content) {
364 setNativeHeader(STOMP_MESSAGE_HEADER, content);
365 }
366
367 public String getMessageId() {
368 return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER);
369 }
370
371 public void setMessageId(String id) {
372 setNativeHeader(STOMP_MESSAGE_ID_HEADER, id);
373 }
374
375 public String getVersion() {
376 return getFirstNativeHeader(STOMP_VERSION_HEADER);
377 }
378
379 public void setVersion(String version) {
380 setNativeHeader(STOMP_VERSION_HEADER, version);
381 }
382
383
384
385
386 @Override
387 public String getShortLogMessage(Object payload) {
388 if (StompCommand.SUBSCRIBE.equals(getCommand())) {
389 return "SUBSCRIBE " + getDestination() + " id=" + getSubscriptionId() + appendSession();
390 }
391 else if (StompCommand.UNSUBSCRIBE.equals(getCommand())) {
392 return "UNSUBSCRIBE id=" + getSubscriptionId() + appendSession();
393 }
394 else if (StompCommand.SEND.equals(getCommand())) {
395 return "SEND " + getDestination() + appendSession() + appendPayload(payload);
396 }
397 else if (StompCommand.CONNECT.equals(getCommand())) {
398 return "CONNECT" + (getUser() != null ? " user=" + getUser().getName() : "") + appendSession();
399 }
400 else if (StompCommand.CONNECTED.equals(getCommand())) {
401 return "CONNECTED heart-beat=" + Arrays.toString(getHeartbeat()) + appendSession();
402 }
403 else if (StompCommand.DISCONNECT.equals(getCommand())) {
404 return "DISCONNECT" + (getReceipt() != null ? " receipt=" + getReceipt() : "") + appendSession();
405 }
406 else {
407 return getDetailedLogMessage(payload);
408 }
409 }
410
411 @Override
412 public String getDetailedLogMessage(Object payload) {
413 if (isHeartbeat()) {
414 return "heart-beat in session " + getSessionId();
415 }
416 StompCommand command = getCommand();
417 if (command == null) {
418 return super.getDetailedLogMessage(payload);
419 }
420 StringBuilder sb = new StringBuilder();
421 sb.append(command.name()).append(" ").append(getNativeHeaders()).append(appendSession());
422 if (getUser() != null) {
423 sb.append(", user=").append(getUser().getName());
424 }
425 if (command.isBodyAllowed()) {
426 sb.append(appendPayload(payload));
427 }
428 return sb.toString();
429 }
430
431 private String appendSession() {
432 return " session=" + getSessionId();
433 }
434
435 private String appendPayload(Object payload) {
436 Assert.isInstanceOf(byte[].class, payload);
437 byte[] bytes = (byte[]) payload;
438 String contentType = (getContentType() != null ? " " + getContentType().toString() : "");
439 if (bytes.length == 0 || getContentType() == null || !isReadableContentType()) {
440 return contentType;
441 }
442 Charset charset = getContentType().getCharSet();
443 charset = (charset != null ? charset : StompDecoder.UTF8_CHARSET);
444 return (bytes.length < 80) ?
445 contentType + " payload=" + new String(bytes, charset) :
446 contentType + " payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)";
447 }
448
449
450
451
452
453
454
455 public static StompHeaderAccessor create(StompCommand command) {
456 return new StompHeaderAccessor(command, null);
457 }
458
459
460
461
462 public static StompHeaderAccessor create(StompCommand command, Map<String, List<String>> headers) {
463 return new StompHeaderAccessor(command, headers);
464 }
465
466
467
468
469
470 public static StompHeaderAccessor createForHeartbeat() {
471 return new StompHeaderAccessor();
472 }
473
474
475
476
477 public static StompHeaderAccessor wrap(Message<?> message) {
478 return new StompHeaderAccessor(message);
479 }
480
481
482
483
484 public static StompCommand getCommand(Map<String, Object> headers) {
485 return (StompCommand) headers.get(COMMAND_HEADER);
486 }
487
488
489
490
491 public static String getPasscode(Map<String, Object> headers) {
492 StompPasscode credentials = (StompPasscode) headers.get(CREDENTIALS_HEADER);
493 return (credentials != null ? credentials.passcode : null);
494 }
495
496 public static Integer getContentLength(Map<String, List<String>> nativeHeaders) {
497 if (nativeHeaders.containsKey(STOMP_CONTENT_LENGTH_HEADER)) {
498 List<String> values = nativeHeaders.get(STOMP_CONTENT_LENGTH_HEADER);
499 String value = (values != null ? values.get(0) : null);
500 return Integer.valueOf(value);
501 }
502 return null;
503 }
504
505
506 private static class StompPasscode {
507
508 private final String passcode;
509
510 public StompPasscode(String passcode) {
511 this.passcode = passcode;
512 }
513
514 @Override
515 public String toString() {
516 return "[PROTECTED]";
517 }
518 }
519
520 }