View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.web.socket.sockjs.client;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.net.URI;
21  import java.nio.ByteBuffer;
22  import java.util.Enumeration;
23  
24  import org.eclipse.jetty.client.HttpClient;
25  import org.eclipse.jetty.client.api.ContentResponse;
26  import org.eclipse.jetty.client.api.Request;
27  import org.eclipse.jetty.client.api.Response;
28  import org.eclipse.jetty.client.util.StringContentProvider;
29  import org.eclipse.jetty.http.HttpFields;
30  import org.eclipse.jetty.http.HttpMethod;
31  
32  import org.springframework.context.Lifecycle;
33  import org.springframework.http.HttpHeaders;
34  import org.springframework.http.HttpStatus;
35  import org.springframework.http.ResponseEntity;
36  import org.springframework.util.Assert;
37  import org.springframework.util.concurrent.SettableListenableFuture;
38  import org.springframework.web.client.HttpServerErrorException;
39  import org.springframework.web.socket.CloseStatus;
40  import org.springframework.web.socket.TextMessage;
41  import org.springframework.web.socket.WebSocketHandler;
42  import org.springframework.web.socket.WebSocketSession;
43  import org.springframework.web.socket.sockjs.SockJsException;
44  import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
45  import org.springframework.web.socket.sockjs.frame.SockJsFrame;
46  
47  
48  /**
49   * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}.
50   *
51   * <p>When used for testing purposes (e.g. load testing) the {@code HttpClient}
52   * properties must be set to allow a larger than usual number of connections and
53   * threads. For example:
54   *
55   * <pre class="code">
56   * HttpClient httpClient = new HttpClient();
57   * httpClient.setMaxConnectionsPerDestination(1000);
58   * httpClient.setExecutor(new QueuedThreadPool(500));
59   * </pre>
60   *
61   * @author Rossen Stoyanchev
62   * @since 4.1
63   */
64  public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransport, Lifecycle {
65  
66  	private final HttpClient httpClient;
67  
68  
69  	public JettyXhrTransport(HttpClient httpClient) {
70  		Assert.notNull(httpClient, "'httpClient' is required");
71  		this.httpClient = httpClient;
72  	}
73  
74  
75  	public HttpClient getHttpClient() {
76  		return this.httpClient;
77  	}
78  
79  	@Override
80  	public void start() {
81  		try {
82  			if (!this.httpClient.isRunning()) {
83  				this.httpClient.start();
84  			}
85  		}
86  		catch (Exception e) {
87  			throw new SockJsException("Failed to start " + this, e);
88  		}
89  	}
90  
91  	@Override
92  	public void stop() {
93  		try {
94  			if (this.httpClient.isRunning()) {
95  				this.httpClient.stop();
96  			}
97  		}
98  		catch (Exception e) {
99  			throw new SockJsException("Failed to stop " + this, e);
100 		}
101 	}
102 
103 	@Override
104 	public boolean isRunning() {
105 		return this.httpClient.isRunning();
106 	}
107 
108 	@Override
109 	protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
110 		return executeRequest(infoUrl, HttpMethod.GET, getRequestHeaders(), null);
111 	}
112 
113 	@Override
114 	public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
115 		return executeRequest(url, HttpMethod.POST, headers, message.getPayload());
116 	}
117 
118 	protected ResponseEntity<String> executeRequest(URI url, HttpMethod method, HttpHeaders headers, String body) {
119 		Request httpRequest = this.httpClient.newRequest(url).method(method);
120 		addHttpHeaders(httpRequest, headers);
121 		if (body != null) {
122 			httpRequest.content(new StringContentProvider(body));
123 		}
124 		ContentResponse response;
125 		try {
126 			response = httpRequest.send();
127 		}
128 		catch (Exception ex) {
129 			throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex);
130 		}
131 		HttpStatus status = HttpStatus.valueOf(response.getStatus());
132 		HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders());
133 		return (response.getContent() != null ?
134 			new ResponseEntity<String>(response.getContentAsString(), responseHeaders, status) :
135 			new ResponseEntity<String>(responseHeaders, status));
136 	}
137 
138 	private static void addHttpHeaders(Request request, HttpHeaders headers) {
139 		for (String name : headers.keySet()) {
140 			for (String value : headers.get(name)) {
141 				request.header(name, value);
142 			}
143 		}
144 	}
145 
146 	private static HttpHeaders toHttpHeaders(HttpFields httpFields) {
147 		HttpHeaders responseHeaders = new HttpHeaders();
148 		Enumeration<String> names = httpFields.getFieldNames();
149 		while (names.hasMoreElements()) {
150 			String name = names.nextElement();
151 			Enumeration<String> values = httpFields.getValues(name);
152 			while (values.hasMoreElements()) {
153 				String value = values.nextElement();
154 				responseHeaders.add(name, value);
155 			}
156 		}
157 		return responseHeaders;
158 	}
159 
160 	@Override
161 	protected void connectInternal(TransportRequest request, WebSocketHandler handler,
162 			URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
163 			SettableListenableFuture<WebSocketSession> connectFuture) {
164 
165 		SockJsResponseListener listener = new SockJsResponseListener(url, getRequestHeaders(), session, connectFuture);
166 		executeReceiveRequest(url, handshakeHeaders, listener);
167 	}
168 
169 	private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
170 		if (logger.isTraceEnabled()) {
171 			logger.trace("Starting XHR receive request, url=" + url);
172 		}
173 		Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
174 		addHttpHeaders(httpRequest, headers);
175 		httpRequest.send(listener);
176 	}
177 
178 
179 	/**
180 	 * Splits the body of an HTTP response into SockJS frames and delegates those
181 	 * to an {@link XhrClientSockJsSession}.
182 	 */
183 	private class SockJsResponseListener extends Response.Listener.Adapter {
184 
185 		private final URI transportUrl;
186 
187 		private final HttpHeaders receiveHeaders;
188 
189 		private final XhrClientSockJsSession sockJsSession;
190 
191 		private final SettableListenableFuture<WebSocketSession> connectFuture;
192 
193 		private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
194 
195 
196 		public SockJsResponseListener(URI url, HttpHeaders headers,	XhrClientSockJsSession sockJsSession,
197 				SettableListenableFuture<WebSocketSession> connectFuture) {
198 
199 			this.transportUrl = url;
200 			this.receiveHeaders = headers;
201 			this.connectFuture = connectFuture;
202 			this.sockJsSession = sockJsSession;
203 		}
204 
205 
206 		@Override
207 		public void onBegin(Response response) {
208 			if (response.getStatus() != 200) {
209 				HttpStatus status = HttpStatus.valueOf(response.getStatus());
210 				response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status"));
211 			}
212 		}
213 
214 		@Override
215 		public void onHeaders(Response response) {
216 			if (logger.isTraceEnabled()) {
217 				// Convert to HttpHeaders to avoid "\n"
218 				logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
219 			}
220 		}
221 
222 		@Override
223 		public void onContent(Response response, ByteBuffer buffer) {
224 			while (true) {
225 				if (this.sockJsSession.isDisconnected()) {
226 					if (logger.isDebugEnabled()) {
227 						logger.debug("SockJS sockJsSession closed, closing response.");
228 					}
229 					response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null));
230 					return;
231 				}
232 				if (buffer.remaining() == 0) {
233 					break;
234 				}
235 				int b = buffer.get();
236 				if (b == '\n') {
237 					handleFrame();
238 				}
239 				else {
240 					this.outputStream.write(b);
241 				}
242 			}
243 		}
244 
245 		private void handleFrame() {
246 			byte[] bytes = this.outputStream.toByteArray();
247 			this.outputStream.reset();
248 			String content = new String(bytes, SockJsFrame.CHARSET);
249 			if (logger.isTraceEnabled()) {
250 				logger.trace("XHR content received: " + content);
251 			}
252 			if (!PRELUDE.equals(content)) {
253 				this.sockJsSession.handleFrame(new String(bytes, SockJsFrame.CHARSET));
254 			}
255 		}
256 
257 		@Override
258 		public void onSuccess(Response response) {
259 			if (this.outputStream.size() > 0) {
260 				handleFrame();
261 			}
262 			if (logger.isTraceEnabled()) {
263 				logger.trace("XHR receive request completed.");
264 			}
265 			executeReceiveRequest(this.transportUrl, this.receiveHeaders, this);
266 		}
267 
268 		@Override
269 		public void onFailure(Response response, Throwable failure) {
270 			if (connectFuture.setException(failure)) {
271 				return;
272 			}
273 			if (this.sockJsSession.isDisconnected()) {
274 				this.sockJsSession.afterTransportClosed(null);
275 			}
276 			else {
277 				this.sockJsSession.handleTransportError(failure);
278 				this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
279 			}
280 		}
281 	}
282 
283 }