1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.jms.config;
18
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.LinkedHashMap;
22 import java.util.Map;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27
28 import org.springframework.beans.factory.BeanInitializationException;
29 import org.springframework.beans.factory.DisposableBean;
30 import org.springframework.beans.factory.InitializingBean;
31 import org.springframework.context.SmartLifecycle;
32 import org.springframework.jms.listener.MessageListenerContainer;
33 import org.springframework.util.Assert;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecycle {
56
57 protected final Log logger = LogFactory.getLog(getClass());
58
59 private final Map<String, MessageListenerContainer> listenerContainers =
60 new LinkedHashMap<String, MessageListenerContainer>();
61
62 private int phase = Integer.MAX_VALUE;
63
64
65
66
67
68
69
70
71
72 public MessageListenerContainer getListenerContainer(String id) {
73 Assert.notNull(id, "Container identifier must not be null");
74 return this.listenerContainers.get(id);
75 }
76
77
78
79
80 public Collection<MessageListenerContainer> getListenerContainers() {
81 return Collections.unmodifiableCollection(this.listenerContainers.values());
82 }
83
84
85
86
87
88
89
90
91
92
93 public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
94 Assert.notNull(endpoint, "Endpoint must not be null");
95 Assert.notNull(factory, "Factory must not be null");
96
97 String id = endpoint.getId();
98 Assert.notNull(id, "Endpoint id must not be null");
99 Assert.state(!this.listenerContainers.containsKey(id),
100 "Another endpoint is already registered with id '" + id + "'");
101
102 MessageListenerContainer container = createListenerContainer(endpoint, factory);
103 this.listenerContainers.put(id, container);
104 }
105
106
107
108
109 protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,
110 JmsListenerContainerFactory<?> factory) {
111
112 MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
113
114 if (listenerContainer instanceof InitializingBean) {
115 try {
116 ((InitializingBean) listenerContainer).afterPropertiesSet();
117 }
118 catch (Exception ex) {
119 throw new BeanInitializationException("Failed to initialize message listener container", ex);
120 }
121 }
122
123 int containerPhase = listenerContainer.getPhase();
124 if (containerPhase < Integer.MAX_VALUE) {
125 if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
126 throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
127 this.phase + " vs " + containerPhase);
128 }
129 this.phase = listenerContainer.getPhase();
130 }
131
132 return listenerContainer;
133 }
134
135
136 @Override
137 public void destroy() {
138 for (MessageListenerContainer listenerContainer : getListenerContainers()) {
139 if (listenerContainer instanceof DisposableBean) {
140 try {
141 ((DisposableBean) listenerContainer).destroy();
142 }
143 catch (Throwable ex) {
144 logger.warn("Failed to destroy message listener container", ex);
145 }
146 }
147 }
148 }
149
150
151
152
153 @Override
154 public int getPhase() {
155 return this.phase;
156 }
157
158 @Override
159 public boolean isAutoStartup() {
160 return true;
161 }
162
163 @Override
164 public void start() {
165 for (MessageListenerContainer listenerContainer : getListenerContainers()) {
166 if (listenerContainer.isAutoStartup()) {
167 listenerContainer.start();
168 }
169 }
170 }
171
172 @Override
173 public void stop() {
174 for (MessageListenerContainer listenerContainer : getListenerContainers()) {
175 listenerContainer.stop();
176 }
177 }
178
179 @Override
180 public void stop(Runnable callback) {
181 Collection<MessageListenerContainer> listenerContainers = getListenerContainers();
182 AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback);
183 for (MessageListenerContainer listenerContainer : listenerContainers) {
184 listenerContainer.stop(aggregatingCallback);
185 }
186 }
187
188 @Override
189 public boolean isRunning() {
190 for (MessageListenerContainer listenerContainer : getListenerContainers()) {
191 if (listenerContainer.isRunning()) {
192 return true;
193 }
194 }
195 return false;
196 }
197
198
199 private static class AggregatingCallback implements Runnable {
200
201 private final AtomicInteger count;
202
203 private final Runnable finishCallback;
204
205 public AggregatingCallback(int count, Runnable finishCallback) {
206 this.count = new AtomicInteger(count);
207 this.finishCallback = finishCallback;
208 }
209
210 @Override
211 public void run() {
212 if (this.count.decrementAndGet() == 0) {
213 this.finishCallback.run();
214 }
215 }
216 }
217
218 }