View Javadoc
1   package org.metricshub.ipmi.core.connection.queue;
2   
3   /*-
4    * ╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲
5    * IPMI Java Client
6    * ჻჻჻჻჻჻
7    * Copyright 2023 Verax Systems, MetricsHub
8    * ჻჻჻჻჻჻
9    * This program is free software: you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation, either version 3 of the
12   * License, or (at your option) any later version.
13   *
14   * This program is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU General Lesser Public License for more details.
18   *
19   * You should have received a copy of the GNU General Lesser Public
20   * License along with this program.  If not, see
21   * <http://www.gnu.org/licenses/lgpl-3.0.html>.
22   * ╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱
23   */
24  
25  import org.metricshub.ipmi.core.coding.PayloadCoder;
26  import org.metricshub.ipmi.core.connection.Connection;
27  import org.metricshub.ipmi.core.connection.ConnectionException;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import java.util.ArrayList;
33  import java.util.Date;
34  import java.util.List;
35  import java.util.Timer;
36  import java.util.TimerTask;
37  
38  /**
39   * Queues messages to send and checks for timeouts.
40   */
41  public class MessageQueue extends TimerTask {
42  
43      private List<QueueElement> queue;
44      private int timeout;
45      private Timer timer;
46      private Connection connection;
47      private int lastSequenceNumber;
48      private Object lastSequenceNumberLock = new Object();
49      private int minSequenceNumber = 1;
50      private int sequenceNumberUpperBound = 64;
51  
52      private static Logger logger = LoggerFactory.getLogger(MessageQueue.class);
53  
54      /**
55       * Frequency of checking messages for timeouts in ms.
56       */
57      private static int cleaningFrequency = 500;
58  
59      /**
60       * Size of the queue determined by IPMI sliding window algorithm
61       * specification. <br>
62       * When queue size is 16, BMC drops some of the messages under heavy load.
63       */
64      private static final int QUEUE_SIZE = 8;
65  
66      public int getTimeout() {
67          return timeout;
68      }
69  
70      public void setTimeout(int timeout) {
71          this.timeout = timeout;
72      }
73  
74      public MessageQueue(Connection connection, int timeout, int minSequenceNumber, int maxSequenceNumber) {
75          this.minSequenceNumber = minSequenceNumber;
76          sequenceNumberUpperBound = maxSequenceNumber + 1;
77          reservedTags = new ArrayList<Integer>();
78          lastSequenceNumber = minSequenceNumber - 1;
79          this.connection = connection;
80          queue = new ArrayList<QueueElement>();
81          setTimeout(timeout);
82          timer = new Timer();
83          timer.schedule(this, cleaningFrequency, cleaningFrequency);
84      }
85  
86      private int incrementSequenceNumber(int currentSequenceNumber) {
87          int newSequenceNumber = (currentSequenceNumber + 1) % sequenceNumberUpperBound;
88  
89          if (newSequenceNumber == 0) {
90              newSequenceNumber = minSequenceNumber;
91          }
92  
93          return newSequenceNumber;
94      }
95  
96      /**
97       * Stops the MessageQueue
98       */
99      public void tearDown() {
100         timer.cancel();
101     }
102 
103     private List<Integer> reservedTags;
104 
105     /**
106      * Check if the tag is reserved.
107      */
108     private synchronized boolean isReserved(int tag) {
109         return reservedTags.contains(tag);
110     }
111 
112     /**
113      * Reserves given tag for the use of the invoker.
114      *
115      * @param tag
116      *            - tag to reserve
117      * @return true if tag was reserved successfully, false otherwise
118      */
119     private synchronized boolean reserveTag(int tag) {
120         if (isReserved(tag)) {
121             reservedTags.add(tag);
122             return true;
123         }
124         return false;
125     }
126 
127     private synchronized void releaseTag(int tag) {
128         reservedTags.remove((Integer) tag);
129     }
130 
131     /**
132      * Adds request to the queue and generates the tag.
133      *
134      * @return Sequence number of the message if it was added to the
135      *         queue, -1 otherwise. The tag used to identify message is equal to
136      *         that value.
137      */
138     public int add(PayloadCoder request) {
139         run();
140         boolean first = true;
141         synchronized (queue) {
142             synchronized (lastSequenceNumberLock) {
143                 if (queue.size() < QUEUE_SIZE) {
144                     int sequenceNumber = incrementSequenceNumber(lastSequenceNumber);
145 
146                     while (isReserved(sequenceNumber)) {
147                         sequenceNumber = incrementSequenceNumber(sequenceNumber);
148 
149                         if (!first) {
150                             try {
151                                 lastSequenceNumberLock.wait(1);
152                             } catch (InterruptedException e) {
153                                 // TODO log
154                             }
155                         }
156 
157                         if (sequenceNumber == lastSequenceNumber) {
158                             //we checked all available sequence numbers, so return -1 (no available sequence numbers)
159                             return -1;
160                         }
161 
162                         first = false;
163                     }
164 
165                     reserveTag(sequenceNumber);
166 
167                     lastSequenceNumber = sequenceNumber;
168 
169                     QueueElement element = new QueueElement(sequenceNumber, request);
170 
171                     queue.add(element);
172                     return sequenceNumber;
173                 }
174             }
175         }
176         return -1;
177 
178     }
179 
180     /**
181      * Removes message with the given tag from the queue.
182      */
183     public void remove(int tag) {
184         run();
185         synchronized (queue) {
186             int i = 0;
187             int index = -1;
188             for (QueueElement element : queue) {
189                 if (element.getId() == tag) {
190                     index = i;
191                     break;
192                 }
193                 ++i;
194             }
195             if (index == 0) {
196                 queue.remove(0);
197                 releaseTag(tag);
198                 while (!queue.isEmpty() && queue.get(0).getRequest() == null) {
199                     int additionalTag = queue.get(0).getId();
200                     queue.remove(0);
201                     releaseTag(additionalTag);
202                 }
203             } else if (index > 0) {
204                 queue.get(index).setRequest(null);
205             }
206 
207         }
208     }
209 
210     /**
211      * Removes message from queue at given index.
212      *
213      * @param index
214      */
215     public void removeAt(int index) {
216         if (index >= queue.size()) {
217             throw new IndexOutOfBoundsException("Index out of bounds : "
218                     + index);
219         }
220 
221         remove(queue.get(index).getId());
222     }
223 
224     /**
225      * Checks if queue contains message with the given sequence number.
226      */
227     public boolean containsId(int sequenceNumber) {
228         synchronized (queue) {
229 
230             for (QueueElement element : queue) {
231                 if (element.getId() == sequenceNumber
232                         && element.getRequest() != null) {
233                     return true;
234                 }
235             }
236 
237         }
238         return false;
239     }
240 
241     /**
242      * Returns valid session sequence number that cannot be used as a tag though
243      */
244     public int getSequenceNumber() {
245         synchronized (lastSequenceNumberLock) {
246             int sequenceNumber = incrementSequenceNumber(lastSequenceNumber);
247 
248             lastSequenceNumber = sequenceNumber;
249 
250             return sequenceNumber;
251         }
252     }
253 
254     /**
255      * Returns message with the given sequence number from the queue or null if
256      * no message with the given tag is currently in the queue.
257      */
258     public PayloadCoder getMessageFromQueue(int tag) {
259         synchronized (queue) {
260             for (QueueElement element : queue) {
261                 if (element.getId() == tag && element.getRequest() != null) {
262                     return element.getRequest();
263                 }
264             }
265         }
266         return null;
267     }
268 
269     /**
270      * Returns index of the message with the given sequence number from the
271      * queue or -1 if no message with the given tag is currently in the queue.
272      */
273     public int getMessageIndexFromQueue(int tag) {
274         synchronized (queue) {
275             int i = 0;
276             for (QueueElement element : queue) {
277                 if (element.getId() == tag && element.getRequest() != null) {
278                     return i;
279                 }
280                 ++i;
281             }
282         }
283         return -1;
284     }
285 
286     /**
287      * Returns number of retries that were performed on message tagged with tag
288      * or -1 if no such message can be found in the queue.
289      *
290      * @deprecated retries are no longer supported on the message level
291      */
292     @Deprecated
293     public int getMessageRetries(int tag) {
294         synchronized (queue) {
295             for (QueueElement element : queue) {
296                 if (element.getId() == tag && element.getRequest() != null) {
297                     return element.getRetries();
298                 }
299             }
300         }
301         return -1;
302     }
303 
304     /**
305      * Returns the ID of the {@link QueueElement} in the queue with the given
306      * tag.
307      *
308      * @param tag
309      *            Tag of the message to find
310      */
311     public int getMessageSequenceNumber(int tag) {
312         synchronized (queue) {
313             for (QueueElement element : queue) {
314                 if (element.getId() == tag && element.getRequest() != null) {
315                     return element.getId();
316                 }
317             }
318         }
319         return -1;
320     }
321 
322     /**
323      * {@link TimerTask} runner - periodically checks queue for timed out
324      * messages.
325      */
326     @Override
327     public void run() {
328         if (queue != null) {
329             synchronized (queue) {
330                 boolean process = true;
331                 while (process && !queue.isEmpty()) {
332                     QueueElement oldestQueueElement = queue.get(0);
333                     boolean done = oldestQueueElement.getRequest() == null;
334 
335                     if (messageJustTimedOut(oldestQueueElement) || done) {
336                         processObsoleteMessage(oldestQueueElement, done);
337                     } else {
338                         process = false;
339                     }
340                 }
341             }
342         }
343     }
344 
345     private boolean messageJustTimedOut(QueueElement oldestQueueElement) {
346         Date now = new Date();
347 
348         return now.getTime() - oldestQueueElement.getTimestamp().getTime() > (long) timeout;
349     }
350 
351     private void processObsoleteMessage(QueueElement message, boolean done) {
352         int tag = message.getId();
353         boolean previouslyTimedOut = message.isTimedOut();
354 
355         if (previouslyTimedOut || done) {
356             queue.remove(0);
357             logger.info("Removing message after timeout, tag: " + tag);
358             releaseTag(tag);
359         } else {
360             message.makeTimedOut();
361             message.refreshTimestamp();
362             connection.notifyResponseListeners(connection.getHandle(), tag, null,
363                     new ConnectionException("Message timed out"));
364         }
365     }
366 
367 }