1 package org.metricshub.ipmi.core.connection.queue;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 import org.metricshub.ipmi.core.coding.PayloadCoder;
26 import org.metricshub.ipmi.core.connection.Connection;
27 import org.metricshub.ipmi.core.connection.ConnectionException;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.util.ArrayList;
33 import java.util.Date;
34 import java.util.List;
35 import java.util.Timer;
36 import java.util.TimerTask;
37
38
39
40
41 public class MessageQueue extends TimerTask {
42
43 private List<QueueElement> queue;
44 private int timeout;
45 private Timer timer;
46 private Connection connection;
47 private int lastSequenceNumber;
48 private Object lastSequenceNumberLock = new Object();
49 private int minSequenceNumber = 1;
50 private int sequenceNumberUpperBound = 64;
51
52 private static Logger logger = LoggerFactory.getLogger(MessageQueue.class);
53
54
55
56
57 private static int cleaningFrequency = 500;
58
59
60
61
62
63
64 private static final int QUEUE_SIZE = 8;
65
66 public int getTimeout() {
67 return timeout;
68 }
69
70 public void setTimeout(int timeout) {
71 this.timeout = timeout;
72 }
73
74 public MessageQueue(Connection connection, int timeout, int minSequenceNumber, int maxSequenceNumber) {
75 this.minSequenceNumber = minSequenceNumber;
76 sequenceNumberUpperBound = maxSequenceNumber + 1;
77 reservedTags = new ArrayList<Integer>();
78 lastSequenceNumber = minSequenceNumber - 1;
79 this.connection = connection;
80 queue = new ArrayList<QueueElement>();
81 setTimeout(timeout);
82 timer = new Timer();
83 timer.schedule(this, cleaningFrequency, cleaningFrequency);
84 }
85
86 private int incrementSequenceNumber(int currentSequenceNumber) {
87 int newSequenceNumber = (currentSequenceNumber + 1) % sequenceNumberUpperBound;
88
89 if (newSequenceNumber == 0) {
90 newSequenceNumber = minSequenceNumber;
91 }
92
93 return newSequenceNumber;
94 }
95
96
97
98
99 public void tearDown() {
100 timer.cancel();
101 }
102
103 private List<Integer> reservedTags;
104
105
106
107
108 private synchronized boolean isReserved(int tag) {
109 return reservedTags.contains(tag);
110 }
111
112
113
114
115
116
117
118
119 private synchronized boolean reserveTag(int tag) {
120 if (isReserved(tag)) {
121 reservedTags.add(tag);
122 return true;
123 }
124 return false;
125 }
126
127 private synchronized void releaseTag(int tag) {
128 reservedTags.remove((Integer) tag);
129 }
130
131
132
133
134
135
136
137
138 public int add(PayloadCoder request) {
139 run();
140 boolean first = true;
141 synchronized (queue) {
142 synchronized (lastSequenceNumberLock) {
143 if (queue.size() < QUEUE_SIZE) {
144 int sequenceNumber = incrementSequenceNumber(lastSequenceNumber);
145
146 while (isReserved(sequenceNumber)) {
147 sequenceNumber = incrementSequenceNumber(sequenceNumber);
148
149 if (!first) {
150 try {
151 lastSequenceNumberLock.wait(1);
152 } catch (InterruptedException e) {
153
154 }
155 }
156
157 if (sequenceNumber == lastSequenceNumber) {
158
159 return -1;
160 }
161
162 first = false;
163 }
164
165 reserveTag(sequenceNumber);
166
167 lastSequenceNumber = sequenceNumber;
168
169 QueueElement element = new QueueElement(sequenceNumber, request);
170
171 queue.add(element);
172 return sequenceNumber;
173 }
174 }
175 }
176 return -1;
177
178 }
179
180
181
182
183 public void remove(int tag) {
184 run();
185 synchronized (queue) {
186 int i = 0;
187 int index = -1;
188 for (QueueElement element : queue) {
189 if (element.getId() == tag) {
190 index = i;
191 break;
192 }
193 ++i;
194 }
195 if (index == 0) {
196 queue.remove(0);
197 releaseTag(tag);
198 while (!queue.isEmpty() && queue.get(0).getRequest() == null) {
199 int additionalTag = queue.get(0).getId();
200 queue.remove(0);
201 releaseTag(additionalTag);
202 }
203 } else if (index > 0) {
204 queue.get(index).setRequest(null);
205 }
206
207 }
208 }
209
210
211
212
213
214
215 public void removeAt(int index) {
216 if (index >= queue.size()) {
217 throw new IndexOutOfBoundsException("Index out of bounds : "
218 + index);
219 }
220
221 remove(queue.get(index).getId());
222 }
223
224
225
226
227 public boolean containsId(int sequenceNumber) {
228 synchronized (queue) {
229
230 for (QueueElement element : queue) {
231 if (element.getId() == sequenceNumber
232 && element.getRequest() != null) {
233 return true;
234 }
235 }
236
237 }
238 return false;
239 }
240
241
242
243
244 public int getSequenceNumber() {
245 synchronized (lastSequenceNumberLock) {
246 int sequenceNumber = incrementSequenceNumber(lastSequenceNumber);
247
248 lastSequenceNumber = sequenceNumber;
249
250 return sequenceNumber;
251 }
252 }
253
254
255
256
257
258 public PayloadCoder getMessageFromQueue(int tag) {
259 synchronized (queue) {
260 for (QueueElement element : queue) {
261 if (element.getId() == tag && element.getRequest() != null) {
262 return element.getRequest();
263 }
264 }
265 }
266 return null;
267 }
268
269
270
271
272
273 public int getMessageIndexFromQueue(int tag) {
274 synchronized (queue) {
275 int i = 0;
276 for (QueueElement element : queue) {
277 if (element.getId() == tag && element.getRequest() != null) {
278 return i;
279 }
280 ++i;
281 }
282 }
283 return -1;
284 }
285
286
287
288
289
290
291
292 @Deprecated
293 public int getMessageRetries(int tag) {
294 synchronized (queue) {
295 for (QueueElement element : queue) {
296 if (element.getId() == tag && element.getRequest() != null) {
297 return element.getRetries();
298 }
299 }
300 }
301 return -1;
302 }
303
304
305
306
307
308
309
310
311 public int getMessageSequenceNumber(int tag) {
312 synchronized (queue) {
313 for (QueueElement element : queue) {
314 if (element.getId() == tag && element.getRequest() != null) {
315 return element.getId();
316 }
317 }
318 }
319 return -1;
320 }
321
322
323
324
325
326 @Override
327 public void run() {
328 if (queue != null) {
329 synchronized (queue) {
330 boolean process = true;
331 while (process && !queue.isEmpty()) {
332 QueueElement oldestQueueElement = queue.get(0);
333 boolean done = oldestQueueElement.getRequest() == null;
334
335 if (messageJustTimedOut(oldestQueueElement) || done) {
336 processObsoleteMessage(oldestQueueElement, done);
337 } else {
338 process = false;
339 }
340 }
341 }
342 }
343 }
344
345 private boolean messageJustTimedOut(QueueElement oldestQueueElement) {
346 Date now = new Date();
347
348 return now.getTime() - oldestQueueElement.getTimestamp().getTime() > (long) timeout;
349 }
350
351 private void processObsoleteMessage(QueueElement message, boolean done) {
352 int tag = message.getId();
353 boolean previouslyTimedOut = message.isTimedOut();
354
355 if (previouslyTimedOut || done) {
356 queue.remove(0);
357 logger.info("Removing message after timeout, tag: " + tag);
358 releaseTag(tag);
359 } else {
360 message.makeTimedOut();
361 message.refreshTimestamp();
362 connection.notifyResponseListeners(connection.getHandle(), tag, null,
363 new ConnectionException("Message timed out"));
364 }
365 }
366
367 }