View Javadoc
1   /*
2     ThreadPool.java
3   
4     (C) Copyright IBM Corp. 2005, 2011
5   
6     THIS FILE IS PROVIDED UNDER THE TERMS OF THE ECLIPSE PUBLIC LICENSE
7     ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THIS FILE
8     CONSTITUTES RECIPIENTS ACCEPTANCE OF THE AGREEMENT.
9   
10    You can obtain a current copy of the Eclipse Public License from
11    http://www.opensource.org/licenses/eclipse-1.0.php
12  
13    @author : Roberto Pineiro, IBM, roberto.pineiro@us.ibm.com
14   * @author : Chung-hao Tan, IBM, chungtan@us.ibm.com
15   * 
16   * 
17   * Change History
18   * Flag       Date        Prog         Description
19   *------------------------------------------------------------------------------- 
20   * 1535756    2006-08-07  lupusalex    Make code warning free
21   * 1565892    2006-11-28  lupusalex    Make SBLIM client JSR48 compliant
22   * 1649779    2007-02-01  lupusalex    Indication listener threads freeze
23   * 2003590    2008-06-30  blaschke-oss Change licensing from CPL to EPL
24   * 2524131    2009-01-21  raman_arora  Upgrade client to JDK 1.5 (Phase 1)
25   * 2531371    2009-02-10  raman_arora  Upgrade client to JDK 1.5 (Phase 2)
26   * 3206904    2011-03-11  lupusalex    Indication listener deadlock causes JVM to run out sockets
27   */
28  package org.metricshub.wbem.sblim.cimclient.internal.util;
29  
30  /*-
31   * ╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲
32   * WBEM Java Client
33   * ჻჻჻჻჻჻
34   * Copyright 2023 - 2025 MetricsHub
35   * ჻჻჻჻჻჻
36   * Licensed under the Apache License, Version 2.0 (the "License");
37   * you may not use this file except in compliance with the License.
38   * You may obtain a copy of the License at
39   *
40   *      http://www.apache.org/licenses/LICENSE-2.0
41   *
42   * Unless required by applicable law or agreed to in writing, software
43   * distributed under the License is distributed on an "AS IS" BASIS,
44   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
45   * See the License for the specific language governing permissions and
46   * limitations under the License.
47   * ╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱╲╱
48   */
49  
50  import java.util.LinkedList;
51  import java.util.List;
52  import java.util.concurrent.BlockingQueue;
53  import java.util.concurrent.LinkedBlockingQueue;
54  import java.util.concurrent.TimeUnit;
55  import java.util.concurrent.atomic.AtomicInteger;
56  import java.util.logging.Level;
57  import org.metricshub.wbem.sblim.cimclient.internal.logging.LogAndTraceBroker;
58  
59  /**
60   * Class ThreadPool implements a pool that manages threads and executes
61   * submitted tasks using this threads.
62   *
63   */
64  public class ThreadPool {
65  
66  	/**
67  	 * Class Worker implements a worker thread used by the thread pool
68  	 *
69  	 */
70  	private static class Worker extends Thread {
71  		private volatile boolean iAlive;
72  
73  		private final ThreadPool iPool;
74  
75  		private Runnable iTask;
76  
77  		private Thread iRunThread;
78  
79  		/**
80  		 * Ctor.
81  		 *
82  		 * @param pool
83  		 *            The owning pool
84  		 * @param name
85  		 *            The name of the thread
86  		 */
87  		public Worker(ThreadPool pool, String name) {
88  			super(pool.getGroup(), name);
89  			this.iPool = pool;
90  			setDaemon(true);
91  		}
92  
93  		@Override
94  		public void start() {
95  			this.iAlive = true;
96  			super.start();
97  		}
98  
99  		/**
100 		 * Kills the thread
101 		 */
102 		public void kill() {
103 			this.iAlive = false;
104 			if (this.iRunThread != null) {
105 				this.iRunThread.interrupt();
106 			}
107 		}
108 
109 		/**
110 		 * Waits for a new task execute
111 		 *
112 		 * @return The task or <code>null</code>
113 		 * @throws InterruptedException
114 		 */
115 		private Runnable waitForTask() throws InterruptedException {
116 			if (this.iTask != null) {
117 				Runnable tsk = this.iTask;
118 				this.iTask = null;
119 				return tsk;
120 			}
121 			if (this.iAlive && (this.iTask == null)) {
122 				this.iTask = this.iPool.getNextTask(this);
123 			}
124 			return null;
125 		}
126 
127 		@Override
128 		public void run() {
129 			// System.out.println("new worker "+getId());
130 			this.iRunThread = Thread.currentThread();
131 			while (this.iAlive) {
132 				try {
133 					Runnable tsk = waitForTask();
134 					if (tsk != null) {
135 						this.iPool.taskStarted();
136 						try {
137 							tsk.run();
138 						} catch (Throwable t) {
139 							LogAndTraceBroker.getBroker().trace(Level.FINE, "Exception while executing task from thread pool", t);
140 						}
141 						this.iPool.taskCompleted();
142 					}
143 				} catch (InterruptedException e) {
144 					/**/
145 				}
146 			}
147 			this.iPool.removeWorker(this);
148 			// System.out.println("dead worker "+getId());
149 		}
150 	}
151 
152 	private ThreadGroup iGroup;
153 
154 	private AtomicInteger iIdleThreads = new AtomicInteger(0);
155 
156 	private List<Worker> iThreadPool = new LinkedList<Worker>();
157 
158 	private BlockingQueue<Runnable> iQueue = new LinkedBlockingQueue<Runnable>();
159 
160 	private long iIdleTimeout;
161 
162 	private int iMaxPoolSize;
163 
164 	private int iMinPoolSize;
165 
166 	private int iToleratedBacklog;
167 
168 	private int iCntr = 0;
169 
170 	private boolean iShutdown = false;
171 
172 	private String iWorkerName;
173 
174 	/**
175 	 * Ctor
176 	 *
177 	 * @param pMinPoolSize
178 	 *            The minimal pool size. The pool will always keep at least this
179 	 *            number of worker threads alive even in no load situations.
180 	 * @param pMaxPoolSize
181 	 *            The maximal pool size. The pool will create up to that number
182 	 *            of worker threads on heavy load.
183 	 * @param pToleratedBacklog
184 	 *            The task backlog that is tolerated before an additional worker
185 	 *            is created
186 	 * @param pToleratedIdle
187 	 *            The idle time of a worker that is tolerated before the worker
188 	 *            is destroyed
189 	 * @param pGroup
190 	 *            Then thread group to put the worker threads in
191 	 * @param pWorkerName
192 	 *            The name to use for worker threads
193 	 */
194 	public ThreadPool(
195 		int pMinPoolSize,
196 		int pMaxPoolSize,
197 		int pToleratedBacklog,
198 		long pToleratedIdle,
199 		ThreadGroup pGroup,
200 		String pWorkerName
201 	) {
202 		this.iGroup = pGroup != null ? pGroup : new ThreadGroup("TreadPool Group");
203 		this.iMinPoolSize = pMinPoolSize;
204 		this.iMaxPoolSize = pMaxPoolSize;
205 		this.iToleratedBacklog = pToleratedBacklog;
206 		this.iIdleTimeout = pToleratedIdle;
207 		this.iWorkerName = pWorkerName != null ? pWorkerName : "Worker ";
208 	}
209 
210 	/**
211 	 * Submits a task for execution
212 	 *
213 	 * @param task
214 	 *            The task
215 	 * @return <code>true</code> if the task was executed or enqueued,
216 	 *         <code>false</code> otherwise.
217 	 */
218 	public synchronized boolean execute(Runnable task) {
219 		if (this.iShutdown) return false;
220 
221 		for (int i = this.iThreadPool.size(); i < this.iMinPoolSize; ++i) {
222 			createWorker();
223 		}
224 
225 		int totalIdle = this.iIdleThreads.get();
226 
227 		boolean added = this.iQueue.offer(task);
228 
229 		if (totalIdle > 0) {
230 			return added;
231 		}
232 
233 		// is maximal pool size reached ?
234 		boolean mayCreateWorker = (this.iMaxPoolSize == -1) || (this.iThreadPool.size() < this.iMaxPoolSize);
235 		// create a new worker when backlog exceeds toleration level or we
236 		// have no workers at all
237 		if (mayCreateWorker && ((this.iQueue.size() > this.iToleratedBacklog) || this.iThreadPool.size() == 0)) {
238 			createWorker();
239 		}
240 		return added;
241 	}
242 
243 	/**
244 	 * Creates a new worker thread
245 	 *
246 	 */
247 	private synchronized void createWorker() {
248 		Worker worker = new Worker(this, this.iWorkerName + getID());
249 		this.iThreadPool.add(worker);
250 		this.iIdleThreads.incrementAndGet();
251 		worker.start();
252 	}
253 
254 	/**
255 	 * Removes a worker from the pool.
256 	 *
257 	 * @param worker
258 	 *            The worker
259 	 */
260 	protected synchronized void removeWorker(Worker worker) {
261 		if (worker != null && this.iThreadPool != null) {
262 			this.iIdleThreads.decrementAndGet();
263 			this.iThreadPool.remove(worker);
264 
265 			// handle race condition where last worker calls removeWorker()
266 			// parallel to call into execute()
267 			if (this.iThreadPool.isEmpty() && !this.iQueue.isEmpty()) {
268 				// System.out.print("bingo");
269 				createWorker();
270 			}
271 		}
272 	}
273 
274 	/**
275 	 * Gets the associated thread group
276 	 *
277 	 * @return The thread group
278 	 */
279 	protected ThreadGroup getGroup() {
280 		return this.iGroup;
281 	}
282 
283 	/**
284 	 * Get a new task. If no task was available during the timeout period the
285 	 * calling worker might be killed if more than the minimum number of workers
286 	 * exist
287 	 *
288 	 * @param worker
289 	 *            The worker asking for a new task
290 	 * @return The next available task from the queue. If no task is available
291 	 *         waits for idle timeout and returns null afterwards.
292 	 * @throws InterruptedException
293 	 *             on interrupt
294 	 */
295 	public Runnable getNextTask(Worker worker) throws InterruptedException {
296 		Runnable task = this.iQueue.poll(this.iIdleTimeout, TimeUnit.MILLISECONDS);
297 		if (task == null && this.iThreadPool.size() > this.iMinPoolSize) {
298 			worker.kill();
299 		}
300 		return task;
301 	}
302 
303 	/**
304 	 * Notifies the pool that at task was started. Effectively decrements the
305 	 * idle worker count.
306 	 */
307 	public void taskStarted() {
308 		/* int idleCount = */this.iIdleThreads.decrementAndGet();
309 		// if (idleCount < 0) System.err.println("idlecount negative");
310 	}
311 
312 	/**
313 	 * Notifies the pool that at task was completed. Effectively increments the
314 	 * idle worker count.
315 	 */
316 	public void taskCompleted() {
317 		/* int idleCount = */this.iIdleThreads.incrementAndGet();
318 		// if (idleCount > this.iThreadPool.size())
319 		// System.err.println("idlecount exceeds total workers");
320 	}
321 
322 	/**
323 	 * Shuts down the thread pool and all workers
324 	 */
325 	public synchronized void shutdown() {
326 		if (!this.iShutdown) {
327 			this.iShutdown = true;
328 			if (this.iThreadPool != null) {
329 				List<Worker> workers = this.iThreadPool;
330 				this.iThreadPool = null;
331 				for (Worker worker : workers) {
332 					worker.kill();
333 				}
334 			}
335 		}
336 	}
337 
338 	/**
339 	 * Generates an &quot;unique&quot; id for a worker thread
340 	 *
341 	 * @return The id
342 	 */
343 	private String getID() {
344 		if (++this.iCntr >= 10000) this.iCntr = 1;
345 		return String.valueOf(this.iCntr);
346 	}
347 }