1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.metricshub.wbem.sblim.cimclient.internal.util;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.concurrent.BlockingQueue;
53 import java.util.concurrent.LinkedBlockingQueue;
54 import java.util.concurrent.TimeUnit;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.logging.Level;
57 import org.metricshub.wbem.sblim.cimclient.internal.logging.LogAndTraceBroker;
58
59
60
61
62
63
64 public class ThreadPool {
65
66
67
68
69
70 private static class Worker extends Thread {
71 private volatile boolean iAlive;
72
73 private final ThreadPool iPool;
74
75 private Runnable iTask;
76
77 private Thread iRunThread;
78
79
80
81
82
83
84
85
86
87 public Worker(ThreadPool pool, String name) {
88 super(pool.getGroup(), name);
89 this.iPool = pool;
90 setDaemon(true);
91 }
92
93 @Override
94 public void start() {
95 this.iAlive = true;
96 super.start();
97 }
98
99
100
101
102 public void kill() {
103 this.iAlive = false;
104 if (this.iRunThread != null) {
105 this.iRunThread.interrupt();
106 }
107 }
108
109
110
111
112
113
114
115 private Runnable waitForTask() throws InterruptedException {
116 if (this.iTask != null) {
117 Runnable tsk = this.iTask;
118 this.iTask = null;
119 return tsk;
120 }
121 if (this.iAlive && (this.iTask == null)) {
122 this.iTask = this.iPool.getNextTask(this);
123 }
124 return null;
125 }
126
127 @Override
128 public void run() {
129
130 this.iRunThread = Thread.currentThread();
131 while (this.iAlive) {
132 try {
133 Runnable tsk = waitForTask();
134 if (tsk != null) {
135 this.iPool.taskStarted();
136 try {
137 tsk.run();
138 } catch (Throwable t) {
139 LogAndTraceBroker.getBroker().trace(Level.FINE, "Exception while executing task from thread pool", t);
140 }
141 this.iPool.taskCompleted();
142 }
143 } catch (InterruptedException e) {
144
145 }
146 }
147 this.iPool.removeWorker(this);
148
149 }
150 }
151
152 private ThreadGroup iGroup;
153
154 private AtomicInteger iIdleThreads = new AtomicInteger(0);
155
156 private List<Worker> iThreadPool = new LinkedList<Worker>();
157
158 private BlockingQueue<Runnable> iQueue = new LinkedBlockingQueue<Runnable>();
159
160 private long iIdleTimeout;
161
162 private int iMaxPoolSize;
163
164 private int iMinPoolSize;
165
166 private int iToleratedBacklog;
167
168 private int iCntr = 0;
169
170 private boolean iShutdown = false;
171
172 private String iWorkerName;
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 public ThreadPool(
195 int pMinPoolSize,
196 int pMaxPoolSize,
197 int pToleratedBacklog,
198 long pToleratedIdle,
199 ThreadGroup pGroup,
200 String pWorkerName
201 ) {
202 this.iGroup = pGroup != null ? pGroup : new ThreadGroup("TreadPool Group");
203 this.iMinPoolSize = pMinPoolSize;
204 this.iMaxPoolSize = pMaxPoolSize;
205 this.iToleratedBacklog = pToleratedBacklog;
206 this.iIdleTimeout = pToleratedIdle;
207 this.iWorkerName = pWorkerName != null ? pWorkerName : "Worker ";
208 }
209
210
211
212
213
214
215
216
217
218 public synchronized boolean execute(Runnable task) {
219 if (this.iShutdown) return false;
220
221 for (int i = this.iThreadPool.size(); i < this.iMinPoolSize; ++i) {
222 createWorker();
223 }
224
225 int totalIdle = this.iIdleThreads.get();
226
227 boolean added = this.iQueue.offer(task);
228
229 if (totalIdle > 0) {
230 return added;
231 }
232
233
234 boolean mayCreateWorker = (this.iMaxPoolSize == -1) || (this.iThreadPool.size() < this.iMaxPoolSize);
235
236
237 if (mayCreateWorker && ((this.iQueue.size() > this.iToleratedBacklog) || this.iThreadPool.size() == 0)) {
238 createWorker();
239 }
240 return added;
241 }
242
243
244
245
246
247 private synchronized void createWorker() {
248 Worker worker = new Worker(this, this.iWorkerName + getID());
249 this.iThreadPool.add(worker);
250 this.iIdleThreads.incrementAndGet();
251 worker.start();
252 }
253
254
255
256
257
258
259
260 protected synchronized void removeWorker(Worker worker) {
261 if (worker != null && this.iThreadPool != null) {
262 this.iIdleThreads.decrementAndGet();
263 this.iThreadPool.remove(worker);
264
265
266
267 if (this.iThreadPool.isEmpty() && !this.iQueue.isEmpty()) {
268
269 createWorker();
270 }
271 }
272 }
273
274
275
276
277
278
279 protected ThreadGroup getGroup() {
280 return this.iGroup;
281 }
282
283
284
285
286
287
288
289
290
291
292
293
294
295 public Runnable getNextTask(Worker worker) throws InterruptedException {
296 Runnable task = this.iQueue.poll(this.iIdleTimeout, TimeUnit.MILLISECONDS);
297 if (task == null && this.iThreadPool.size() > this.iMinPoolSize) {
298 worker.kill();
299 }
300 return task;
301 }
302
303
304
305
306
307 public void taskStarted() {
308 this.iIdleThreads.decrementAndGet();
309
310 }
311
312
313
314
315
316 public void taskCompleted() {
317 this.iIdleThreads.incrementAndGet();
318
319
320 }
321
322
323
324
325 public synchronized void shutdown() {
326 if (!this.iShutdown) {
327 this.iShutdown = true;
328 if (this.iThreadPool != null) {
329 List<Worker> workers = this.iThreadPool;
330 this.iThreadPool = null;
331 for (Worker worker : workers) {
332 worker.kill();
333 }
334 }
335 }
336 }
337
338
339
340
341
342
343 private String getID() {
344 if (++this.iCntr >= 10000) this.iCntr = 1;
345 return String.valueOf(this.iCntr);
346 }
347 }