1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package uk.co.westhawk.snmp.stack;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 import java.io.*;
53 import java.util.*;
54
55 import uk.co.westhawk.snmp.event.*;
56 import uk.co.westhawk.snmp.net.*;
57 import uk.co.westhawk.snmp.util.*;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 public class ListeningContext implements ListeningContextFace, Runnable {
104 private static final String version_id = "@(#)$Id: ListeningContext.java,v 3.12 2009/03/05 13:24:00 birgita Exp $ Copyright Westhawk Ltd";
105
106 private Object soc_lock = new Object();
107
108
109 private static int counter;
110 private ContextSocketFace soc;
111 private Thread me;
112 private String basename;
113 private volatile boolean stopRequested;
114
115
116
117 protected int maxRecvSize;
118 protected String typeSocket;
119 protected int hostPort;
120 protected String bindAddr;
121
122 transient private RawPduReceivedSupport pduSupport, unhandledSupport;
123
124
125
126
127
128
129
130
131 public ListeningContext(int port) {
132 this(port, null, SnmpContextBasisFace.STANDARD_SOCKET);
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 public ListeningContext(int port, String bindAddress) {
155 this(port, bindAddress, SnmpContextBasisFace.STANDARD_SOCKET);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 public ListeningContext(int port, String bindAddress, String typeSocketA) {
192 hostPort = port;
193 bindAddr = bindAddress;
194 typeSocket = typeSocketA;
195
196 basename = "" + hostPort + "_" + bindAddr;
197
198 pduSupport = new RawPduReceivedSupport(this);
199 unhandledSupport = new RawPduReceivedSupport(this);
200 maxRecvSize = SnmpContextBasisFace.MSS;
201 }
202
203 public int getPort() {
204 return hostPort;
205 }
206
207 public String getBindAddress() {
208 return bindAddr;
209 }
210
211 public String getTypeSocket() {
212 return typeSocket;
213 }
214
215 public int getMaxRecvSize() {
216 return maxRecvSize;
217 }
218
219 public void setMaxRecvSize(int no) {
220 maxRecvSize = no;
221 }
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249 public void destroy() {
250 synchronized (soc_lock) {
251 stopRequested = true;
252 if (soc != null) {
253 if (AsnObject.debug > 12) {
254 System.out.println(getClass().getName() + ".destroy(): Closing socket ");
255 }
256 soc.close();
257 }
258 }
259 }
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 public void run() {
283
284 while (!stopRequested) {
285
286 me.yield();
287 try {
288 if (stopRequested) {
289 break;
290 }
291
292 StreamPortItem item = soc.receive(maxRecvSize);
293 ByteArrayInputStream in = item.getStream();
294
295 String hostAddress = item.getHostAddress();
296 int port = item.getHostPort();
297
298
299 int nb = in.available();
300 byte[] bu = new byte[nb];
301 in.read(bu);
302 in.close();
303
304 if (AsnObject.debug > 10) {
305 SnmpUtilities.dumpBytes(getClass().getName()
306 + ".run(): Received from "
307 + hostAddress
308 + ", from port " + port
309 + ": ", bu);
310 }
311 KickProcessIncomingMessage thread = new KickProcessIncomingMessage(hostAddress, port, bu);
312 thread.start();
313 } catch (IOException exc) {
314 if (exc instanceof InterruptedIOException) {
315 if (AsnObject.debug > 15) {
316 System.out.println(getClass().getName() + ".run(): Idle recv " + exc.getMessage());
317 }
318 } else {
319 if (AsnObject.debug > 0) {
320 System.out.println(getClass().getName() + ".run(): IOException: " + exc.getMessage());
321 }
322 }
323 } catch (Exception exc) {
324 if (AsnObject.debug > 0) {
325 System.out.println(getClass().getName() + ".run(): Exception: " + exc.getMessage());
326 exc.printStackTrace();
327 }
328 } catch (Error err) {
329 if (AsnObject.debug > 0) {
330 System.out.println(getClass().getName() + ".run(): Error: " + err.getMessage());
331 err.printStackTrace();
332 }
333 }
334 }
335
336 me = null;
337 soc = null;
338 pduSupport.empty();
339 unhandledSupport.empty();
340 }
341
342 public void addRawPduListener(RawPduListener listener)
343 throws IOException {
344 synchronized (soc_lock) {
345 pduSupport.addRawPduListener(listener);
346 startListening();
347 }
348 }
349
350 public void removeRawPduListener(RawPduListener listener) {
351 synchronized (soc_lock) {
352 pduSupport.removeRawPduListener(listener);
353 destroyIfNoListeners();
354 }
355 }
356
357 public void addUnhandledRawPduListener(RawPduListener listener)
358 throws IOException {
359 synchronized (soc_lock) {
360 unhandledSupport.addRawPduListener(listener);
361 startListening();
362 }
363 }
364
365 public void removeUnhandledRawPduListener(RawPduListener listener) {
366 synchronized (soc_lock) {
367 unhandledSupport.removeRawPduListener(listener);
368 destroyIfNoListeners();
369 }
370 }
371
372
373
374
375
376
377
378
379
380
381
382 private void startListening()
383 throws IOException {
384 if (soc == null) {
385
386
387 ContextSocketFace tempSoc = AbstractSnmpContext.getSocket(typeSocket);
388 if (tempSoc != null) {
389 tempSoc.create(hostPort, bindAddr);
390 soc = tempSoc;
391
392 if (AsnObject.debug > 12) {
393 System.out.println(getClass().getName() + ".startListening()"
394 + ": soc.getLocalSocketAddress() = " + soc.getLocalSocketAddress());
395 System.out.println(getClass().getName() + ".startListening()"
396 + ": soc.getRemoteSocketAddress() = " + soc.getRemoteSocketAddress());
397 }
398 }
399 }
400 if (me == null) {
401 stopRequested = false;
402 me = new Thread(this, basename + "_Listen");
403 me.setPriority(me.NORM_PRIORITY);
404 me.start();
405 }
406 }
407
408
409
410
411
412
413
414 public String getHashKey() {
415 String str = hostPort
416 + "_" + bindAddr
417 + "_" + typeSocket;
418 return str;
419 }
420
421
422
423
424
425
426 public String toString() {
427 StringBuffer buffer = new StringBuffer("ListeningContext[");
428 buffer.append("port=").append(hostPort);
429 buffer.append(", bindAddress=").append(bindAddr);
430 buffer.append(", socketType=").append(typeSocket);
431 buffer.append(", #rawPduListeners=").append(pduSupport.getListenerCount());
432 buffer.append(", #rawPduUnhandledListeners=").append(unhandledSupport.getListenerCount());
433 buffer.append("]");
434 return buffer.toString();
435 }
436
437
438
439
440
441
442 protected void processIncomingMessage(String hostAddress,
443 int port, byte[] bu) throws DecodingException, IOException {
444 AsnDecoderBase rpdu = new AsnDecoderBase();
445 ByteArrayInputStream in = new ByteArrayInputStream(bu);
446 AsnSequence asnTopSeq = rpdu.getAsnSequence(in);
447 int version = rpdu.getSNMPVersion(asnTopSeq);
448
449 boolean isConsumed = pduSupport.fireRawPduReceived(version, hostAddress, port, bu);
450 if (isConsumed == false) {
451 unhandledSupport.fireRawPduReceived(version, hostAddress, port, bu);
452 }
453 }
454
455
456
457
458
459
460
461
462 private void destroyIfNoListeners() {
463 if (pduSupport.getListenerCount() == 0
464 && unhandledSupport.getListenerCount() == 0) {
465 destroy();
466 }
467 }
468
469 class KickProcessIncomingMessage extends Thread {
470
471
472
473
474
475
476
477 private String hostAddress;
478 private int port;
479 private byte[] bu;
480
481 KickProcessIncomingMessage(String newHostAddress, int newPort,
482 byte[] newBu) {
483 hostAddress = newHostAddress;
484 port = newPort;
485 bu = newBu;
486 this.setPriority(Thread.MIN_PRIORITY);
487 this.setName(newHostAddress + "_" + newPort
488 + "_KickProcessIncomingMessage_" + counter);
489 counter++;
490 }
491
492 public void run() {
493 try {
494 processIncomingMessage(hostAddress, port, bu);
495 } catch (IOException exc) {
496 if (AsnObject.debug > 0) {
497 System.out.println(getClass().getName() + ".run(): IOException: " + exc.getMessage());
498 }
499 } catch (DecodingException exc) {
500 if (AsnObject.debug > 1) {
501 System.out.println(getClass().getName() + ".run(): DecodingException: " + exc.getMessage());
502 }
503 }
504 }
505 }
506
507 }