View Javadoc
1   /*
2    * Copyright 2002-2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.web.socket.client.jetty;
18  
19  import java.net.URI;
20  import java.security.Principal;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.Future;
25  
26  import org.eclipse.jetty.websocket.api.Session;
27  import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
28  import org.eclipse.jetty.websocket.client.WebSocketClient;
29  
30  import org.springframework.context.Lifecycle;
31  import org.springframework.core.task.AsyncListenableTaskExecutor;
32  import org.springframework.core.task.SimpleAsyncTaskExecutor;
33  import org.springframework.core.task.TaskExecutor;
34  import org.springframework.http.HttpHeaders;
35  import org.springframework.util.concurrent.ListenableFuture;
36  import org.springframework.util.concurrent.ListenableFutureTask;
37  import org.springframework.web.socket.WebSocketExtension;
38  import org.springframework.web.socket.WebSocketHandler;
39  import org.springframework.web.socket.WebSocketSession;
40  import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
41  import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
42  import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter;
43  import org.springframework.web.socket.client.AbstractWebSocketClient;
44  import org.springframework.web.util.UriComponents;
45  import org.springframework.web.util.UriComponentsBuilder;
46  
47  /**
48   * Initiates WebSocket requests to a WebSocket server programmatically
49   * through the Jetty WebSocket API.
50   *
51   * <p>As of 4.1 this class implements {@link Lifecycle} rather than
52   * {@link org.springframework.context.SmartLifecycle}. Use
53   * {@link org.springframework.web.socket.client.WebSocketConnectionManager
54   * WebSocketConnectionManager} instead to auto-start a WebSocket connection.
55   *
56   * @author Rossen Stoyanchev
57   * @since 4.0
58   */
59  public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle {
60  
61  	private final org.eclipse.jetty.websocket.client.WebSocketClient client;
62  
63  	private final Object lifecycleMonitor = new Object();
64  
65  	private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
66  
67  
68  	/**
69  	 * Default constructor that creates an instance of
70  	 * {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
71  	 */
72  	public JettyWebSocketClient() {
73  		this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
74  	}
75  
76  	/**
77  	 * Constructor that accepts an existing
78  	 * {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
79  	 */
80  	public JettyWebSocketClient(WebSocketClient client) {
81  		this.client = client;
82  	}
83  
84  
85  	/**
86  	 * Set an {@link AsyncListenableTaskExecutor} to use when opening connections.
87  	 * If this property is set to {@code null}, calls to  any of the
88  	 * {@code doHandshake} methods will block until the connection is established.
89  	 *
90  	 * <p>By default an instance of {@code SimpleAsyncTaskExecutor} is used.
91  	 */
92  	public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
93  		this.taskExecutor = taskExecutor;
94  	}
95  
96  	/**
97  	 * Return the configured {@link TaskExecutor}.
98  	 */
99  	public AsyncListenableTaskExecutor getTaskExecutor() {
100 		return this.taskExecutor;
101 	}
102 
103 	@Override
104 	public boolean isRunning() {
105 		synchronized (this.lifecycleMonitor) {
106 			return this.client.isStarted();
107 		}
108 	}
109 
110 	@Override
111 	public void start() {
112 		synchronized (this.lifecycleMonitor) {
113 			if (!isRunning()) {
114 				try {
115 					if (logger.isInfoEnabled()) {
116 						logger.info("Starting Jetty WebSocketClient");
117 					}
118 					this.client.start();
119 				}
120 				catch (Exception e) {
121 					throw new IllegalStateException("Failed to start Jetty client", e);
122 				}
123 			}
124 		}
125 	}
126 
127 	@Override
128 	public void stop() {
129 		synchronized (this.lifecycleMonitor) {
130 			if (isRunning()) {
131 				try {
132 					if (logger.isInfoEnabled()) {
133 						logger.info("Stopping Jetty WebSocketClient");
134 					}
135 					this.client.stop();
136 				}
137 				catch (Exception e) {
138 					logger.error("Error stopping Jetty WebSocketClient", e);
139 				}
140 			}
141 		}
142 	}
143 
144 	@Override
145 	public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
146 			String uriTemplate, Object... uriVars) {
147 
148 		UriComponents uriComponents = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode();
149 		return doHandshake(webSocketHandler, null, uriComponents.toUri());
150 	}
151 
152 	@Override
153 	public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler wsHandler,
154 			HttpHeaders headers, final URI uri, List<String> protocols,
155 			List<WebSocketExtension> extensions,  Map<String, Object> attributes) {
156 
157 		final ClientUpgradeRequest request = new ClientUpgradeRequest();
158 		request.setSubProtocols(protocols);
159 
160 		for (WebSocketExtension e : extensions) {
161 			request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e));
162 		}
163 
164 		for (String header : headers.keySet()) {
165 			request.setHeader(header, headers.get(header));
166 		}
167 
168 		Principal user = getUser();
169 		final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user);
170 		final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
171 
172 		Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
173 			@Override
174 			public WebSocketSession call() throws Exception {
175 				Future<Session> future = client.connect(listener, uri, request);
176 				future.get();
177 				return wsSession;
178 			}
179 		};
180 
181 		if (this.taskExecutor != null) {
182 			return this.taskExecutor.submitListenable(connectTask);
183 		}
184 		else {
185 			ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
186 			task.run();
187 			return task;
188 		}
189 	}
190 
191 	/**
192 	 * @return the user to make available through {@link WebSocketSession#getPrincipal()};
193 	 * 	by default this method returns {@code null}
194 	 */
195 	protected Principal getUser() {
196 		return null;
197 	}
198 
199 }