1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.web.socket.client.jetty;
18
19 import java.net.URI;
20 import java.security.Principal;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.Future;
25
26 import org.eclipse.jetty.websocket.api.Session;
27 import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
28 import org.eclipse.jetty.websocket.client.WebSocketClient;
29
30 import org.springframework.context.Lifecycle;
31 import org.springframework.core.task.AsyncListenableTaskExecutor;
32 import org.springframework.core.task.SimpleAsyncTaskExecutor;
33 import org.springframework.core.task.TaskExecutor;
34 import org.springframework.http.HttpHeaders;
35 import org.springframework.util.concurrent.ListenableFuture;
36 import org.springframework.util.concurrent.ListenableFutureTask;
37 import org.springframework.web.socket.WebSocketExtension;
38 import org.springframework.web.socket.WebSocketHandler;
39 import org.springframework.web.socket.WebSocketSession;
40 import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter;
41 import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession;
42 import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter;
43 import org.springframework.web.socket.client.AbstractWebSocketClient;
44 import org.springframework.web.util.UriComponents;
45 import org.springframework.web.util.UriComponentsBuilder;
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle {
60
61 private final org.eclipse.jetty.websocket.client.WebSocketClient client;
62
63 private final Object lifecycleMonitor = new Object();
64
65 private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
66
67
68
69
70
71
72 public JettyWebSocketClient() {
73 this.client = new org.eclipse.jetty.websocket.client.WebSocketClient();
74 }
75
76
77
78
79
80 public JettyWebSocketClient(WebSocketClient client) {
81 this.client = client;
82 }
83
84
85
86
87
88
89
90
91
92 public void setTaskExecutor(AsyncListenableTaskExecutor taskExecutor) {
93 this.taskExecutor = taskExecutor;
94 }
95
96
97
98
99 public AsyncListenableTaskExecutor getTaskExecutor() {
100 return this.taskExecutor;
101 }
102
103 @Override
104 public boolean isRunning() {
105 synchronized (this.lifecycleMonitor) {
106 return this.client.isStarted();
107 }
108 }
109
110 @Override
111 public void start() {
112 synchronized (this.lifecycleMonitor) {
113 if (!isRunning()) {
114 try {
115 if (logger.isInfoEnabled()) {
116 logger.info("Starting Jetty WebSocketClient");
117 }
118 this.client.start();
119 }
120 catch (Exception e) {
121 throw new IllegalStateException("Failed to start Jetty client", e);
122 }
123 }
124 }
125 }
126
127 @Override
128 public void stop() {
129 synchronized (this.lifecycleMonitor) {
130 if (isRunning()) {
131 try {
132 if (logger.isInfoEnabled()) {
133 logger.info("Stopping Jetty WebSocketClient");
134 }
135 this.client.stop();
136 }
137 catch (Exception e) {
138 logger.error("Error stopping Jetty WebSocketClient", e);
139 }
140 }
141 }
142 }
143
144 @Override
145 public ListenableFuture<WebSocketSession> doHandshake(WebSocketHandler webSocketHandler,
146 String uriTemplate, Object... uriVars) {
147
148 UriComponents uriComponents = UriComponentsBuilder.fromUriString(uriTemplate).buildAndExpand(uriVars).encode();
149 return doHandshake(webSocketHandler, null, uriComponents.toUri());
150 }
151
152 @Override
153 public ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler wsHandler,
154 HttpHeaders headers, final URI uri, List<String> protocols,
155 List<WebSocketExtension> extensions, Map<String, Object> attributes) {
156
157 final ClientUpgradeRequest request = new ClientUpgradeRequest();
158 request.setSubProtocols(protocols);
159
160 for (WebSocketExtension e : extensions) {
161 request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e));
162 }
163
164 for (String header : headers.keySet()) {
165 request.setHeader(header, headers.get(header));
166 }
167
168 Principal user = getUser();
169 final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user);
170 final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession);
171
172 Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
173 @Override
174 public WebSocketSession call() throws Exception {
175 Future<Session> future = client.connect(listener, uri, request);
176 future.get();
177 return wsSession;
178 }
179 };
180
181 if (this.taskExecutor != null) {
182 return this.taskExecutor.submitListenable(connectTask);
183 }
184 else {
185 ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
186 task.run();
187 return task;
188 }
189 }
190
191
192
193
194
195 protected Principal getUser() {
196 return null;
197 }
198
199 }