1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.web.socket.sockjs.client;
18
19 import java.io.ByteArrayOutputStream;
20 import java.net.URI;
21 import java.nio.ByteBuffer;
22 import java.util.Enumeration;
23
24 import org.eclipse.jetty.client.HttpClient;
25 import org.eclipse.jetty.client.api.ContentResponse;
26 import org.eclipse.jetty.client.api.Request;
27 import org.eclipse.jetty.client.api.Response;
28 import org.eclipse.jetty.client.util.StringContentProvider;
29 import org.eclipse.jetty.http.HttpFields;
30 import org.eclipse.jetty.http.HttpMethod;
31
32 import org.springframework.context.Lifecycle;
33 import org.springframework.http.HttpHeaders;
34 import org.springframework.http.HttpStatus;
35 import org.springframework.http.ResponseEntity;
36 import org.springframework.util.Assert;
37 import org.springframework.util.concurrent.SettableListenableFuture;
38 import org.springframework.web.client.HttpServerErrorException;
39 import org.springframework.web.socket.CloseStatus;
40 import org.springframework.web.socket.TextMessage;
41 import org.springframework.web.socket.WebSocketHandler;
42 import org.springframework.web.socket.WebSocketSession;
43 import org.springframework.web.socket.sockjs.SockJsException;
44 import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
45 import org.springframework.web.socket.sockjs.frame.SockJsFrame;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransport, Lifecycle {
65
66 private final HttpClient httpClient;
67
68
69 public JettyXhrTransport(HttpClient httpClient) {
70 Assert.notNull(httpClient, "'httpClient' is required");
71 this.httpClient = httpClient;
72 }
73
74
75 public HttpClient getHttpClient() {
76 return this.httpClient;
77 }
78
79 @Override
80 public void start() {
81 try {
82 if (!this.httpClient.isRunning()) {
83 this.httpClient.start();
84 }
85 }
86 catch (Exception e) {
87 throw new SockJsException("Failed to start " + this, e);
88 }
89 }
90
91 @Override
92 public void stop() {
93 try {
94 if (this.httpClient.isRunning()) {
95 this.httpClient.stop();
96 }
97 }
98 catch (Exception e) {
99 throw new SockJsException("Failed to stop " + this, e);
100 }
101 }
102
103 @Override
104 public boolean isRunning() {
105 return this.httpClient.isRunning();
106 }
107
108 @Override
109 protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
110 return executeRequest(infoUrl, HttpMethod.GET, getRequestHeaders(), null);
111 }
112
113 @Override
114 public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
115 return executeRequest(url, HttpMethod.POST, headers, message.getPayload());
116 }
117
118 protected ResponseEntity<String> executeRequest(URI url, HttpMethod method, HttpHeaders headers, String body) {
119 Request httpRequest = this.httpClient.newRequest(url).method(method);
120 addHttpHeaders(httpRequest, headers);
121 if (body != null) {
122 httpRequest.content(new StringContentProvider(body));
123 }
124 ContentResponse response;
125 try {
126 response = httpRequest.send();
127 }
128 catch (Exception ex) {
129 throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex);
130 }
131 HttpStatus status = HttpStatus.valueOf(response.getStatus());
132 HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders());
133 return (response.getContent() != null ?
134 new ResponseEntity<String>(response.getContentAsString(), responseHeaders, status) :
135 new ResponseEntity<String>(responseHeaders, status));
136 }
137
138 private static void addHttpHeaders(Request request, HttpHeaders headers) {
139 for (String name : headers.keySet()) {
140 for (String value : headers.get(name)) {
141 request.header(name, value);
142 }
143 }
144 }
145
146 private static HttpHeaders toHttpHeaders(HttpFields httpFields) {
147 HttpHeaders responseHeaders = new HttpHeaders();
148 Enumeration<String> names = httpFields.getFieldNames();
149 while (names.hasMoreElements()) {
150 String name = names.nextElement();
151 Enumeration<String> values = httpFields.getValues(name);
152 while (values.hasMoreElements()) {
153 String value = values.nextElement();
154 responseHeaders.add(name, value);
155 }
156 }
157 return responseHeaders;
158 }
159
160 @Override
161 protected void connectInternal(TransportRequest request, WebSocketHandler handler,
162 URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
163 SettableListenableFuture<WebSocketSession> connectFuture) {
164
165 SockJsResponseListener listener = new SockJsResponseListener(url, getRequestHeaders(), session, connectFuture);
166 executeReceiveRequest(url, handshakeHeaders, listener);
167 }
168
169 private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
170 if (logger.isTraceEnabled()) {
171 logger.trace("Starting XHR receive request, url=" + url);
172 }
173 Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
174 addHttpHeaders(httpRequest, headers);
175 httpRequest.send(listener);
176 }
177
178
179
180
181
182
183 private class SockJsResponseListener extends Response.Listener.Adapter {
184
185 private final URI transportUrl;
186
187 private final HttpHeaders receiveHeaders;
188
189 private final XhrClientSockJsSession sockJsSession;
190
191 private final SettableListenableFuture<WebSocketSession> connectFuture;
192
193 private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
194
195
196 public SockJsResponseListener(URI url, HttpHeaders headers, XhrClientSockJsSession sockJsSession,
197 SettableListenableFuture<WebSocketSession> connectFuture) {
198
199 this.transportUrl = url;
200 this.receiveHeaders = headers;
201 this.connectFuture = connectFuture;
202 this.sockJsSession = sockJsSession;
203 }
204
205
206 @Override
207 public void onBegin(Response response) {
208 if (response.getStatus() != 200) {
209 HttpStatus status = HttpStatus.valueOf(response.getStatus());
210 response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status"));
211 }
212 }
213
214 @Override
215 public void onHeaders(Response response) {
216 if (logger.isTraceEnabled()) {
217
218 logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
219 }
220 }
221
222 @Override
223 public void onContent(Response response, ByteBuffer buffer) {
224 while (true) {
225 if (this.sockJsSession.isDisconnected()) {
226 if (logger.isDebugEnabled()) {
227 logger.debug("SockJS sockJsSession closed, closing response.");
228 }
229 response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null));
230 return;
231 }
232 if (buffer.remaining() == 0) {
233 break;
234 }
235 int b = buffer.get();
236 if (b == '\n') {
237 handleFrame();
238 }
239 else {
240 this.outputStream.write(b);
241 }
242 }
243 }
244
245 private void handleFrame() {
246 byte[] bytes = this.outputStream.toByteArray();
247 this.outputStream.reset();
248 String content = new String(bytes, SockJsFrame.CHARSET);
249 if (logger.isTraceEnabled()) {
250 logger.trace("XHR content received: " + content);
251 }
252 if (!PRELUDE.equals(content)) {
253 this.sockJsSession.handleFrame(new String(bytes, SockJsFrame.CHARSET));
254 }
255 }
256
257 @Override
258 public void onSuccess(Response response) {
259 if (this.outputStream.size() > 0) {
260 handleFrame();
261 }
262 if (logger.isTraceEnabled()) {
263 logger.trace("XHR receive request completed.");
264 }
265 executeReceiveRequest(this.transportUrl, this.receiveHeaders, this);
266 }
267
268 @Override
269 public void onFailure(Response response, Throwable failure) {
270 if (connectFuture.setException(failure)) {
271 return;
272 }
273 if (this.sockJsSession.isDisconnected()) {
274 this.sockJsSession.afterTransportClosed(null);
275 }
276 else {
277 this.sockJsSession.handleTransportError(failure);
278 this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
279 }
280 }
281 }
282
283 }