1 /*
2 ThreadPool.java
3
4 (C) Copyright IBM Corp. 2005, 2011
5
6 THIS FILE IS PROVIDED UNDER THE TERMS OF THE ECLIPSE PUBLIC LICENSE
7 ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THIS FILE
8 CONSTITUTES RECIPIENTS ACCEPTANCE OF THE AGREEMENT.
9
10 You can obtain a current copy of the Eclipse Public License from
11 http://www.opensource.org/licenses/eclipse-1.0.php
12
13 @author : Roberto Pineiro, IBM, roberto.pineiro@us.ibm.com
14 * @author : Chung-hao Tan, IBM, chungtan@us.ibm.com
15 *
16 *
17 * Change History
18 * Flag Date Prog Description
19 *-------------------------------------------------------------------------------
20 * 1535756 2006-08-07 lupusalex Make code warning free
21 * 1565892 2006-11-28 lupusalex Make SBLIM client JSR48 compliant
22 * 1649779 2007-02-01 lupusalex Indication listener threads freeze
23 * 2003590 2008-06-30 blaschke-oss Change licensing from CPL to EPL
24 * 2524131 2009-01-21 raman_arora Upgrade client to JDK 1.5 (Phase 1)
25 * 2531371 2009-02-10 raman_arora Upgrade client to JDK 1.5 (Phase 2)
26 * 3206904 2011-03-11 lupusalex Indication listener deadlock causes JVM to run out sockets
27 */
28 package org.metricshub.wbem.sblim.cimclient.internal.util;
29
30 /*-
31 * ╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲
32 * WBEM Java Client
33 * ჻჻჻჻჻჻
34 * Copyright 2023 - 2025 MetricsHub
35 * ჻჻჻჻჻჻
36 * Licensed under the Apache License, Version 2.0 (the "License");
37 * you may not use this file except in compliance with the License.
38 * You may obtain a copy of the License at
39 *
40 * http://www.apache.org/licenses/LICENSE-2.0
41 *
42 * Unless required by applicable law or agreed to in writing, software
43 * distributed under the License is distributed on an "AS IS" BASIS,
44 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
45 * See the License for the specific language governing permissions and
46 * limitations under the License.
47 * ╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱
48 */
49
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.concurrent.BlockingQueue;
53 import java.util.concurrent.LinkedBlockingQueue;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.logging.Level;
57 import org.metricshub.wbem.sblim.cimclient.internal.logging.LogAndTraceBroker;
58
59 /**
60 * Class ThreadPool implements a pool that manages threads and executes
61 * submitted tasks using this threads.
62 *
63 */
64 public class ThreadPool {
65
66 /**
67 * Class Worker implements a worker thread used by the thread pool
68 *
69 */
70 private static class Worker extends Thread {
71 private volatile boolean iAlive;
72
73 private final ThreadPool iPool;
74
75 private Runnable iTask;
76
77 private Thread iRunThread;
78
79 /**
80 * Ctor.
81 *
82 * @param pool
83 * The owning pool
84 * @param name
85 * The name of the thread
86 */
87 public Worker(ThreadPool pool, String name) {
88 super(pool.getGroup(), name);
89 this.iPool = pool;
90 setDaemon(true);
91 }
92
93 @Override
94 public void start() {
95 this.iAlive = true;
96 super.start();
97 }
98
99 /**
100 * Kills the thread
101 */
102 public void kill() {
103 this.iAlive = false;
104 if (this.iRunThread != null) {
105 this.iRunThread.interrupt();
106 }
107 }
108
109 /**
110 * Waits for a new task execute
111 *
112 * @return The task or <code>null</code>
113 * @throws InterruptedException
114 */
115 private Runnable waitForTask() throws InterruptedException {
116 if (this.iTask != null) {
117 Runnable tsk = this.iTask;
118 this.iTask = null;
119 return tsk;
120 }
121 if (this.iAlive && (this.iTask == null)) {
122 this.iTask = this.iPool.getNextTask(this);
123 }
124 return null;
125 }
126
127 @Override
128 public void run() {
129 // System.out.println("new worker "+getId());
130 this.iRunThread = Thread.currentThread();
131 while (this.iAlive) {
132 try {
133 Runnable tsk = waitForTask();
134 if (tsk != null) {
135 this.iPool.taskStarted();
136 try {
137 tsk.run();
138 } catch (Throwable t) {
139 LogAndTraceBroker.getBroker().trace(Level.FINE, "Exception while executing task from thread pool", t);
140 }
141 this.iPool.taskCompleted();
142 }
143 } catch (InterruptedException e) {
144 /**/
145 }
146 }
147 this.iPool.removeWorker(this);
148 // System.out.println("dead worker "+getId());
149 }
150 }
151
152 private ThreadGroup iGroup;
153
154 private AtomicInteger iIdleThreads = new AtomicInteger(0);
155
156 private List<Worker> iThreadPool = new LinkedList<Worker>();
157
158 private BlockingQueue<Runnable> iQueue = new LinkedBlockingQueue<Runnable>();
159
160 private long iIdleTimeout;
161
162 private int iMaxPoolSize;
163
164 private int iMinPoolSize;
165
166 private int iToleratedBacklog;
167
168 private int iCntr = 0;
169
170 private boolean iShutdown = false;
171
172 private String iWorkerName;
173
174 /**
175 * Ctor
176 *
177 * @param pMinPoolSize
178 * The minimal pool size. The pool will always keep at least this
179 * number of worker threads alive even in no load situations.
180 * @param pMaxPoolSize
181 * The maximal pool size. The pool will create up to that number
182 * of worker threads on heavy load.
183 * @param pToleratedBacklog
184 * The task backlog that is tolerated before an additional worker
185 * is created
186 * @param pToleratedIdle
187 * The idle time of a worker that is tolerated before the worker
188 * is destroyed
189 * @param pGroup
190 * Then thread group to put the worker threads in
191 * @param pWorkerName
192 * The name to use for worker threads
193 */
194 public ThreadPool(
195 int pMinPoolSize,
196 int pMaxPoolSize,
197 int pToleratedBacklog,
198 long pToleratedIdle,
199 ThreadGroup pGroup,
200 String pWorkerName
201 ) {
202 this.iGroup = pGroup != null ? pGroup : new ThreadGroup("TreadPool Group");
203 this.iMinPoolSize = pMinPoolSize;
204 this.iMaxPoolSize = pMaxPoolSize;
205 this.iToleratedBacklog = pToleratedBacklog;
206 this.iIdleTimeout = pToleratedIdle;
207 this.iWorkerName = pWorkerName != null ? pWorkerName : "Worker ";
208 }
209
210 /**
211 * Submits a task for execution
212 *
213 * @param task
214 * The task
215 * @return <code>true</code> if the task was executed or enqueued,
216 * <code>false</code> otherwise.
217 */
218 public synchronized boolean execute(Runnable task) {
219 if (this.iShutdown) return false;
220
221 for (int i = this.iThreadPool.size(); i < this.iMinPoolSize; ++i) {
222 createWorker();
223 }
224
225 int totalIdle = this.iIdleThreads.get();
226
227 boolean added = this.iQueue.offer(task);
228
229 if (totalIdle > 0) {
230 return added;
231 }
232
233 // is maximal pool size reached ?
234 boolean mayCreateWorker = (this.iMaxPoolSize == -1) || (this.iThreadPool.size() < this.iMaxPoolSize);
235 // create a new worker when backlog exceeds toleration level or we
236 // have no workers at all
237 if (mayCreateWorker && ((this.iQueue.size() > this.iToleratedBacklog) || this.iThreadPool.size() == 0)) {
238 createWorker();
239 }
240 return added;
241 }
242
243 /**
244 * Creates a new worker thread
245 *
246 */
247 private synchronized void createWorker() {
248 Worker worker = new Worker(this, this.iWorkerName + getID());
249 this.iThreadPool.add(worker);
250 this.iIdleThreads.incrementAndGet();
251 worker.start();
252 }
253
254 /**
255 * Removes a worker from the pool.
256 *
257 * @param worker
258 * The worker
259 */
260 protected synchronized void removeWorker(Worker worker) {
261 if (worker != null && this.iThreadPool != null) {
262 this.iIdleThreads.decrementAndGet();
263 this.iThreadPool.remove(worker);
264
265 // handle race condition where last worker calls removeWorker()
266 // parallel to call into execute()
267 if (this.iThreadPool.isEmpty() && !this.iQueue.isEmpty()) {
268 // System.out.print("bingo");
269 createWorker();
270 }
271 }
272 }
273
274 /**
275 * Gets the associated thread group
276 *
277 * @return The thread group
278 */
279 protected ThreadGroup getGroup() {
280 return this.iGroup;
281 }
282
283 /**
284 * Get a new task. If no task was available during the timeout period the
285 * calling worker might be killed if more than the minimum number of workers
286 * exist
287 *
288 * @param worker
289 * The worker asking for a new task
290 * @return The next available task from the queue. If no task is available
291 * waits for idle timeout and returns null afterwards.
292 * @throws InterruptedException
293 * on interrupt
294 */
295 public Runnable getNextTask(Worker worker) throws InterruptedException {
296 Runnable task = this.iQueue.poll(this.iIdleTimeout, TimeUnit.MILLISECONDS);
297 if (task == null && this.iThreadPool.size() > this.iMinPoolSize) {
298 worker.kill();
299 }
300 return task;
301 }
302
303 /**
304 * Notifies the pool that at task was started. Effectively decrements the
305 * idle worker count.
306 */
307 public void taskStarted() {
308 /* int idleCount = */this.iIdleThreads.decrementAndGet();
309 // if (idleCount < 0) System.err.println("idlecount negative");
310 }
311
312 /**
313 * Notifies the pool that at task was completed. Effectively increments the
314 * idle worker count.
315 */
316 public void taskCompleted() {
317 /* int idleCount = */this.iIdleThreads.incrementAndGet();
318 // if (idleCount > this.iThreadPool.size())
319 // System.err.println("idlecount exceeds total workers");
320 }
321
322 /**
323 * Shuts down the thread pool and all workers
324 */
325 public synchronized void shutdown() {
326 if (!this.iShutdown) {
327 this.iShutdown = true;
328 if (this.iThreadPool != null) {
329 List<Worker> workers = this.iThreadPool;
330 this.iThreadPool = null;
331 for (Worker worker : workers) {
332 worker.kill();
333 }
334 }
335 }
336 }
337
338 /**
339 * Generates an "unique" id for a worker thread
340 *
341 * @return The id
342 */
343 private String getID() {
344 if (++this.iCntr >= 10000) this.iCntr = 1;
345 return String.valueOf(this.iCntr);
346 }
347 }