View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.messaging.simp.stomp;
18  
19  import java.nio.charset.Charset;
20  import java.util.Arrays;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.springframework.messaging.Message;
28  import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
29  import org.springframework.messaging.simp.SimpMessageType;
30  import org.springframework.messaging.support.MessageHeaderAccessor;
31  import org.springframework.util.Assert;
32  import org.springframework.util.MimeType;
33  import org.springframework.util.MimeTypeUtils;
34  import org.springframework.util.StringUtils;
35  
36  /**
37   * A {@code MessageHeaderAccessor} to use when creating a {@code Message} from a
38   * decoded STOMP frame, or when encoding a {@code Message} to a STOMP frame.
39   *
40   * <p>When created from STOMP frame content, the actual STOMP headers are stored
41   * in the native header sub-map managed by the parent class
42   * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor}
43   * while the parent class
44   * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor} manages
45   * common processing headers some of which are based on STOMP headers (e.g.
46   * destination, content-type, etc).
47   *
48   * <p>An instance of this class can also be created by wrapping an existing
49   * {@code Message}. That message may have been created with the more generic
50   * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor} in
51   * which case STOMP headers are created from common processing headers.
52   * In this case it is also necessary to invoke either
53   * {@link #updateStompCommandAsClientMessage()} or
54   * {@link #updateStompCommandAsServerMessage()} if sending a message and
55   * depending on whether a message is sent to a client or the message broker.
56   *
57   * @author Rossen Stoyanchev
58   * @since 4.0
59   */
60  public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
61  
62  	private static final AtomicLong messageIdCounter = new AtomicLong();
63  
64  	private static final long[] DEFAULT_HEARTBEAT = new long[] {0, 0};
65  
66  
67  	// STOMP header names
68  
69  	public static final String STOMP_ID_HEADER = "id";
70  
71  	public static final String STOMP_HOST_HEADER = "host";
72  
73  	public static final String STOMP_ACCEPT_VERSION_HEADER = "accept-version";
74  
75  	public static final String STOMP_MESSAGE_ID_HEADER = "message-id";
76  
77  	public static final String STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT
78  
79  	public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame
80  
81  	public static final String STOMP_SUBSCRIPTION_HEADER = "subscription";
82  
83  	public static final String STOMP_VERSION_HEADER = "version";
84  
85  	public static final String STOMP_MESSAGE_HEADER = "message";
86  
87  	public static final String STOMP_ACK_HEADER = "ack";
88  
89  	public static final String STOMP_NACK_HEADER = "nack";
90  
91  	public static final String STOMP_LOGIN_HEADER = "login";
92  
93  	public static final String STOMP_PASSCODE_HEADER = "passcode";
94  
95  	public static final String STOMP_DESTINATION_HEADER = "destination";
96  
97  	public static final String STOMP_CONTENT_TYPE_HEADER = "content-type";
98  
99  	public static final String STOMP_CONTENT_LENGTH_HEADER = "content-length";
100 
101 	public static final String STOMP_HEARTBEAT_HEADER = "heart-beat";
102 
103 	// Other header names
104 
105 	private static final String COMMAND_HEADER = "stompCommand";
106 
107 	private static final String CREDENTIALS_HEADER = "stompCredentials";
108 
109 
110 	/**
111 	 * A constructor for creating message headers from a parsed STOMP frame.
112 	 */
113 	StompHeaderAccessor(StompCommand command, Map<String, List<String>> externalSourceHeaders) {
114 		super(command.getMessageType(), externalSourceHeaders);
115 		setHeader(COMMAND_HEADER, command);
116 		updateSimpMessageHeadersFromStompHeaders();
117 	}
118 
119 	/**
120 	 * A constructor for accessing and modifying existing message headers.
121 	 * Note that the message headers may not have been created from a STOMP frame
122 	 * but may have rather originated from using the more generic
123 	 * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor}.
124 	 */
125 	StompHeaderAccessor(Message<?> message) {
126 		super(message);
127 		updateStompHeadersFromSimpMessageHeaders();
128 	}
129 
130 	StompHeaderAccessor() {
131 		super(SimpMessageType.HEARTBEAT, null);
132 	}
133 
134 
135 	void updateSimpMessageHeadersFromStompHeaders() {
136 		if (getNativeHeaders() == null) {
137 			return;
138 		}
139 		String value = getFirstNativeHeader(STOMP_DESTINATION_HEADER);
140 		if (value != null) {
141 			super.setDestination(value);
142 		}
143 		value = getFirstNativeHeader(STOMP_CONTENT_TYPE_HEADER);
144 		if (value != null) {
145 			super.setContentType(MimeTypeUtils.parseMimeType(value));
146 		}
147 		StompCommand command = getCommand();
148 		if (StompCommand.MESSAGE.equals(command)) {
149 			value = getFirstNativeHeader(STOMP_SUBSCRIPTION_HEADER);
150 			if (value != null) {
151 				super.setSubscriptionId(value);
152 			}
153 		}
154 		else if (StompCommand.SUBSCRIBE.equals(command) || StompCommand.UNSUBSCRIBE.equals(command)) {
155 			value = getFirstNativeHeader(STOMP_ID_HEADER);
156 			if (value != null) {
157 				super.setSubscriptionId(value);
158 			}
159 		}
160 		else if (StompCommand.CONNECT.equals(command)) {
161 			protectPasscode();
162 		}
163 	}
164 
165 	private void updateStompHeadersFromSimpMessageHeaders() {
166 		if (getDestination() != null) {
167 			setNativeHeader(STOMP_DESTINATION_HEADER, getDestination());
168 		}
169 		if (getContentType() != null) {
170 			setNativeHeader(STOMP_CONTENT_TYPE_HEADER, getContentType().toString());
171 		}
172 		trySetStompHeaderForSubscriptionId();
173 	}
174 
175 
176 	@Override
177 	protected MessageHeaderAccessor createAccessor(Message<?> message) {
178 		return wrap(message);
179 	}
180 
181 	Map<String, List<String>> getNativeHeaders() {
182 		@SuppressWarnings("unchecked")
183 		Map<String, List<String>> map = (Map<String, List<String>>) getHeader(NATIVE_HEADERS);
184 		return (map != null ? map : Collections.<String, List<String>>emptyMap());
185 	}
186 
187 	public StompCommand updateStompCommandAsClientMessage() {
188 		Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()), "Unexpected message type " + getMessage());
189 		if (getCommand() == null) {
190 			setHeader(COMMAND_HEADER, StompCommand.SEND);
191 		}
192 		else if (!getCommand().equals(StompCommand.SEND)) {
193 			throw new IllegalStateException("Unexpected STOMP command " + getCommand());
194 		}
195 		return getCommand();
196 	}
197 
198 	public void updateStompCommandAsServerMessage() {
199 		Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()), "Unexpected message type " + getMessage());
200 		StompCommand command = getCommand();
201 		if ((command == null) || StompCommand.SEND.equals(command)) {
202 			setHeader(COMMAND_HEADER, StompCommand.MESSAGE);
203 		}
204 		else if (!StompCommand.MESSAGE.equals(command)) {
205 			throw new IllegalStateException("Unexpected STOMP command " + command);
206 		}
207 		trySetStompHeaderForSubscriptionId();
208 		if (getMessageId() == null) {
209 			String messageId = getSessionId() + "-" + messageIdCounter.getAndIncrement();
210 			setNativeHeader(STOMP_MESSAGE_ID_HEADER, messageId);
211 		}
212 	}
213 
214 	/**
215 	 * Return the STOMP command, or {@code null} if not yet set.
216 	 */
217 	public StompCommand getCommand() {
218 		return (StompCommand) getHeader(COMMAND_HEADER);
219 	}
220 
221 	public Set<String> getAcceptVersion() {
222 		String rawValue = getFirstNativeHeader(STOMP_ACCEPT_VERSION_HEADER);
223 		return (rawValue != null ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.<String>emptySet());
224 	}
225 
226 	public boolean isHeartbeat() {
227 		return (SimpMessageType.HEARTBEAT == getMessageType());
228 	}
229 
230 	public void setAcceptVersion(String acceptVersion) {
231 		setNativeHeader(STOMP_ACCEPT_VERSION_HEADER, acceptVersion);
232 	}
233 
234 	public void setHost(String host) {
235 		setNativeHeader(STOMP_HOST_HEADER, host);
236 	}
237 
238 	public String getHost() {
239 		return getFirstNativeHeader(STOMP_HOST_HEADER);
240 	}
241 
242 	@Override
243 	public void setDestination(String destination) {
244 		super.setDestination(destination);
245 		setNativeHeader(STOMP_DESTINATION_HEADER, destination);
246 	}
247 
248 	public long[] getHeartbeat() {
249 		String rawValue = getFirstNativeHeader(STOMP_HEARTBEAT_HEADER);
250 		if (!StringUtils.hasText(rawValue)) {
251 			return Arrays.copyOf(DEFAULT_HEARTBEAT, 2);
252 		}
253 		String[] rawValues = StringUtils.commaDelimitedListToStringArray(rawValue);
254 		return new long[] { Long.valueOf(rawValues[0]), Long.valueOf(rawValues[1])};
255 	}
256 
257 	public void setContentType(MimeType contentType) {
258 		super.setContentType(contentType);
259 		setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString());
260 	}
261 
262 	@Override
263 	public void setSubscriptionId(String subscriptionId) {
264 		super.setSubscriptionId(subscriptionId);
265 		trySetStompHeaderForSubscriptionId();
266 	}
267 
268 	private void trySetStompHeaderForSubscriptionId() {
269 		String subscriptionId = getSubscriptionId();
270 		if (subscriptionId != null) {
271 			if (getCommand() != null && StompCommand.MESSAGE.equals(getCommand())) {
272 				setNativeHeader(STOMP_SUBSCRIPTION_HEADER, subscriptionId);
273 			}
274 			else {
275 				SimpMessageType messageType = getMessageType();
276 				if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
277 					setNativeHeader(STOMP_ID_HEADER, subscriptionId);
278 				}
279 			}
280 		}
281 	}
282 
283 	public Integer getContentLength() {
284 		if (containsNativeHeader(STOMP_CONTENT_LENGTH_HEADER)) {
285 			return Integer.valueOf(getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER));
286 		}
287 		return null;
288 	}
289 
290 	public void setContentLength(int contentLength) {
291 		setNativeHeader(STOMP_CONTENT_LENGTH_HEADER, String.valueOf(contentLength));
292 	}
293 
294 	public void setHeartbeat(long cx, long cy) {
295 		setNativeHeader(STOMP_HEARTBEAT_HEADER, StringUtils.arrayToCommaDelimitedString(new Object[]{cx, cy}));
296 	}
297 
298 	public void setAck(String ack) {
299 		setNativeHeader(STOMP_ACK_HEADER, ack);
300 	}
301 
302 	public String getAck() {
303 		return getFirstNativeHeader(STOMP_ACK_HEADER);
304 	}
305 
306 	public void setNack(String nack) {
307 		setNativeHeader(STOMP_NACK_HEADER, nack);
308 	}
309 
310 	public String getNack() {
311 		return getFirstNativeHeader(STOMP_NACK_HEADER);
312 	}
313 
314 	public void setLogin(String login) {
315 		setNativeHeader(STOMP_LOGIN_HEADER, login);
316 	}
317 
318 	public String getLogin() {
319 		return getFirstNativeHeader(STOMP_LOGIN_HEADER);
320 	}
321 
322 	public void setPasscode(String passcode) {
323 		setNativeHeader(STOMP_PASSCODE_HEADER, passcode);
324 		protectPasscode();
325 	}
326 
327 	private void protectPasscode() {
328 		String value = getFirstNativeHeader(STOMP_PASSCODE_HEADER);
329 		if (value != null && !"PROTECTED".equals(value)) {
330 			setHeader(CREDENTIALS_HEADER, new StompPasscode(value));
331 			setNativeHeader(STOMP_PASSCODE_HEADER, "PROTECTED");
332 		}
333 	}
334 
335 	/**
336 	 * Return the passcode header value, or {@code null} if not set.
337 	 */
338 	public String getPasscode() {
339 		StompPasscode credentials = (StompPasscode) getHeader(CREDENTIALS_HEADER);
340 		return (credentials != null ? credentials.passcode : null);
341 	}
342 
343 	public void setReceiptId(String receiptId) {
344 		setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId);
345 	}
346 
347 	public String getReceiptId() {
348 		return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER);
349 	}
350 
351 	public void setReceipt(String receiptId) {
352 		setNativeHeader(STOMP_RECEIPT_HEADER, receiptId);
353 	}
354 
355 	public String getReceipt() {
356 		return getFirstNativeHeader(STOMP_RECEIPT_HEADER);
357 	}
358 
359 	public String getMessage() {
360 		return getFirstNativeHeader(STOMP_MESSAGE_HEADER);
361 	}
362 
363 	public void setMessage(String content) {
364 		setNativeHeader(STOMP_MESSAGE_HEADER, content);
365 	}
366 
367 	public String getMessageId() {
368 		return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER);
369 	}
370 
371 	public void setMessageId(String id) {
372 		setNativeHeader(STOMP_MESSAGE_ID_HEADER, id);
373 	}
374 
375 	public String getVersion() {
376 		return getFirstNativeHeader(STOMP_VERSION_HEADER);
377 	}
378 
379 	public void setVersion(String version) {
380 		setNativeHeader(STOMP_VERSION_HEADER, version);
381 	}
382 
383 
384 	// Logging related
385 
386 	@Override
387 	public String getShortLogMessage(Object payload) {
388 		if (StompCommand.SUBSCRIBE.equals(getCommand())) {
389 			return "SUBSCRIBE " + getDestination() + " id=" + getSubscriptionId() + appendSession();
390 		}
391 		else if (StompCommand.UNSUBSCRIBE.equals(getCommand())) {
392 			return "UNSUBSCRIBE id=" + getSubscriptionId() + appendSession();
393 		}
394 		else if (StompCommand.SEND.equals(getCommand())) {
395 			return "SEND " + getDestination() + appendSession() + appendPayload(payload);
396 		}
397 		else if (StompCommand.CONNECT.equals(getCommand())) {
398 			return "CONNECT" + (getUser() != null ? " user=" + getUser().getName() : "") + appendSession();
399 		}
400 		else if (StompCommand.CONNECTED.equals(getCommand())) {
401 			return "CONNECTED heart-beat=" + Arrays.toString(getHeartbeat()) + appendSession();
402 		}
403 		else if (StompCommand.DISCONNECT.equals(getCommand())) {
404 			return "DISCONNECT" + (getReceipt() != null ? " receipt=" + getReceipt() : "") + appendSession();
405 		}
406 		else {
407 			return getDetailedLogMessage(payload);
408 		}
409 	}
410 
411 	@Override
412 	public String getDetailedLogMessage(Object payload) {
413 		if (isHeartbeat()) {
414 			return "heart-beat in session " + getSessionId();
415 		}
416 		StompCommand command = getCommand();
417 		if (command == null) {
418 			return super.getDetailedLogMessage(payload);
419 		}
420 		StringBuilder sb = new StringBuilder();
421 		sb.append(command.name()).append(" ").append(getNativeHeaders()).append(appendSession());
422 		if (getUser() != null) {
423 			sb.append(", user=").append(getUser().getName());
424 		}
425 		if (command.isBodyAllowed()) {
426 			sb.append(appendPayload(payload));
427 		}
428 		return sb.toString();
429 	}
430 
431 	private String appendSession() {
432 		return " session=" + getSessionId();
433 	}
434 
435 	private String appendPayload(Object payload) {
436 		Assert.isInstanceOf(byte[].class, payload);
437 		byte[] bytes = (byte[]) payload;
438 		String contentType = (getContentType() != null ? " " + getContentType().toString() : "");
439 		if (bytes.length == 0 || getContentType() == null || !isReadableContentType()) {
440 			return contentType;
441 		}
442 		Charset charset = getContentType().getCharSet();
443 		charset = (charset != null ? charset : StompDecoder.UTF8_CHARSET);
444 		return (bytes.length < 80) ?
445 				contentType + " payload=" + new String(bytes, charset) :
446 				contentType + " payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)";
447 	}
448 
449 
450 	// Static factory methods and accessors
451 
452 	/**
453 	 * Create an instance for the given STOMP command.
454 	 */
455 	public static StompHeaderAccessor create(StompCommand command) {
456 		return new StompHeaderAccessor(command, null);
457 	}
458 
459 	/**
460 	 * Create an instance for the given STOMP command and headers.
461 	 */
462 	public static StompHeaderAccessor create(StompCommand command, Map<String, List<String>> headers) {
463 		return new StompHeaderAccessor(command, headers);
464 	}
465 
466 	/**
467 	 * Create headers for a heartbeat. While a STOMP heartbeat frame does not
468 	 * have headers, a session id is needed for processing purposes at a minimum.
469 	 */
470 	public static StompHeaderAccessor createForHeartbeat() {
471 		return new StompHeaderAccessor();
472 	}
473 
474 	/**
475 	 * Create an instance from the payload and headers of the given Message.
476 	 */
477 	public static StompHeaderAccessor wrap(Message<?> message) {
478 		return new StompHeaderAccessor(message);
479 	}
480 
481 	/**
482 	 * Return the STOMP command from the given headers, or {@code null} if not set.
483 	 */
484 	public static StompCommand getCommand(Map<String, Object> headers) {
485 		return (StompCommand) headers.get(COMMAND_HEADER);
486 	}
487 
488 	/**
489 	 * Return the passcode header value, or {@code null} if not set.
490 	 */
491 	public static String getPasscode(Map<String, Object> headers) {
492 		StompPasscode credentials = (StompPasscode) headers.get(CREDENTIALS_HEADER);
493 		return (credentials != null ? credentials.passcode : null);
494 	}
495 
496 	public static Integer getContentLength(Map<String, List<String>> nativeHeaders) {
497 		if (nativeHeaders.containsKey(STOMP_CONTENT_LENGTH_HEADER)) {
498 			List<String> values = nativeHeaders.get(STOMP_CONTENT_LENGTH_HEADER);
499 			String value = (values != null ? values.get(0) : null);
500 			return Integer.valueOf(value);
501 		}
502 		return null;
503 	}
504 
505 
506 	private static class StompPasscode {
507 
508 		private final String passcode;
509 
510 		public StompPasscode(String passcode) {
511 			this.passcode = passcode;
512 		}
513 
514 		@Override
515 		public String toString() {
516 			return "[PROTECTED]";
517 		}
518 	}
519 
520 }