ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing cvsroot/AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.7 by root, Sun Aug 2 14:24:23 2009 UTC vs.
Revision 1.10 by root, Sun Oct 31 18:15:22 2010 UTC

23 my @result = $cv->wait; 23 my @result = $cv->wait;
24 24
25=head1 DESCRIPTION 25=head1 DESCRIPTION
26 26
27This module implements an alternative "event dispatcher" for Net::SNMP, 27This module implements an alternative "event dispatcher" for Net::SNMP,
28using AnyEvent as a backend. 28using AnyEvent as a backend. This integrates Net::SNMP into AnyEvent. That
29 29means you can make non-blocking Net::SNMP calls and as long as other
30This integrates Net::SNMP into AnyEvent: You can make non-blocking 30parts of your program also use AnyEvent (or some event loop supported by
31Net::SNMP calls and as long as other parts of your program also use 31AnyEvent), they will run in parallel.
32AnyEvent (or some event loop supported by AnyEvent), they will run in
33parallel.
34 32
35Also, the Net::SNMP scheduler is very inefficient with respect to both CPU 33Also, the Net::SNMP scheduler is very inefficient with respect to both CPU
36and memory usage. Most AnyEvent backends (including the pure-perl backend) 34and memory usage. Most AnyEvent backends (including the pure-perl backend)
37fare much better than the Net::SNMP dispatcher. 35fare much better than the Net::SNMP dispatcher.
38 36
37Another major added fetaure of this module over Net::SNMP is automatic
38rate-adjustments: Net::SNMP is so slow that firing a few thousand
39requests can cause many timeouts simply because Net::SNMP cannot process
40the replies in time. This module automatically adapts the send rate to
41avoid false timeouts caused by slow reply processing.
42
39A potential disadvantage is that replacing the dispatcher is not at all 43A potential disadvantage of this module is that replacing the dispatcher
40a documented thing to do, so future changes in Net::SNP might break this 44is not at all a documented thing to do, so future changes in Net::SNP
41module (or the many similar ones). 45might break this module (or the many similar ones).
42 46
43This module does not export anything and does not require you to do 47This module does not export anything and does not require you to do
44anything special apart from loading it I<before doing any non-blocking 48anything special apart from loading it I<before doing any non-blocking
45requests with Net::SNMP>. It is recommended but not required to load this 49requests with Net::SNMP>. It is recommended but not required to load this
46module before C<Net::SNMP>. 50module before C<Net::SNMP>.
75 79
76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 80AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
77and downwards. 81and downwards.
78 82
79Increasing C<$MAX_OUTSTANDING> will not automatically use the 83Increasing C<$MAX_OUTSTANDING> will not automatically use the
80C<extra request slots. To increase $MAX_OUTSTANDING> and make 84extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<C<AnyEvent::SNMP> make use of the extra paralellity, call 85C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: 86C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83 87
84 AnyEvent::SNMP::set_max_outstanding 500; 88 AnyEvent::SNMP::set_max_outstanding 500;
85 89
86Although due to the dynamic adjustment, this might have little lasting 90Although due to the dynamic adjustment, this might have little lasting
145} 149}
146 150
147use Net::SNMP (); 151use Net::SNMP ();
148use AnyEvent (); 152use AnyEvent ();
149 153
150our $VERSION = '0.2'; 154our $VERSION = '1.0';
151 155
152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 156$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
153 157
154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 158our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
155 159
156# avoid the method call
157my $timer = sub { shift->timer (@_) };
158AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
159
160our $BUSY; 160our $BUSY;
161our $DONE; # finished all jobs
161our @TRANSPORT; # fileno => [count, watcher] 162our @TRANSPORT; # fileno => [count, watcher]
162our @QUEUE; 163our @QUEUE;
163our $MAX_OUTSTANDING = 50; 164our $MAX_OUTSTANDING = 50;
164our $MIN_RECVQUEUE = 8; 165our $MIN_RECVQUEUE = 8;
165our $MAX_RECVQUEUE = 64; 166our $MAX_RECVQUEUE = 64;
188 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 189 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
189 if $pdu->expect_response; 190 if $pdu->expect_response;
190 191
191 # A crude attempt to recover from temporary failures. 192 # A crude attempt to recover from temporary failures.
192 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 193 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
193 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 194 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
194 undef $retry_w; 195 undef $retry_w;
195 _send_pdu ($pdu, $retries); 196 _send_pdu ($pdu, $retries);
196 }); 197 };
197 } else { 198 } else {
198 --$BUSY; 199 --$BUSY;
199 kick_job; 200 kick_job;
200 } 201 }
201 202
209 my $transport = $msg->transport; 210 my $transport = $msg->transport;
210 my $fileno = $transport->fileno; 211 my $fileno = $transport->fileno;
211 212
212 # register the transport 213 # register the transport
213 unless ($TRANSPORT[$fileno][0]++) { 214 unless ($TRANSPORT[$fileno][0]++) {
214 $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 215 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
215 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 216 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
216 # Create a new Message object to receive the response 217 # Create a new Message object to receive the response
217 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 218 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
218 219
219 if (!defined $msg) { 220 if (!defined $msg) {
274 275
275 # when we end up here, we successfully handled $MAX_RECVQUEUE 276 # when we end up here, we successfully handled $MAX_RECVQUEUE
276 # replies in one iteration, so assume we are overloaded 277 # replies in one iteration, so assume we are overloaded
277 # and reduce the amount of parallelity. 278 # and reduce the amount of parallelity.
278 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; 279 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
279 }); 280 };
280 } 281 }
281 282
282 $msg->timeout_id (\(my $rtimeout_w = 283 $msg->timeout_id (\(my $rtimeout_w =
283 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 284 AE::timer $pdu->timeout, 0, sub {
284 my $rtimeout_w = $msg->timeout_id; 285 my $rtimeout_w = $msg->timeout_id;
285 if ($$rtimeout_w) { 286 if ($$rtimeout_w) {
286 undef $$rtimeout_w; 287 undef $$rtimeout_w;
287 delete $TRANSPORT[$fileno] 288 delete $TRANSPORT[$fileno]
288 unless --$TRANSPORT[$fileno][0]; 289 unless --$TRANSPORT[$fileno][0];
296 297
297 --$BUSY; 298 --$BUSY;
298 kick_job; 299 kick_job;
299 } 300 }
300 }) 301 })
301 )); 302 );
302 } else { 303 } else {
303 --$BUSY; 304 --$BUSY;
304 kick_job; 305 kick_job;
305 } 306 }
306} 307}
309 while ($BUSY < $MAX_OUTSTANDING) { 310 while ($BUSY < $MAX_OUTSTANDING) {
310 my $pdu = shift @QUEUE 311 my $pdu = shift @QUEUE
311 or last; 312 or last;
312 313
313 ++$BUSY; 314 ++$BUSY;
314
315 _send_pdu $pdu, $pdu->retries; 315 _send_pdu $pdu, $pdu->retries;
316 } 316 }
317
318 $DONE and $DONE->() unless $BUSY;
317} 319}
318 320
319sub send_pdu($$$) { 321sub send_pdu($$$) {
320 my (undef, $pdu, $delay) = @_; 322 my (undef, $pdu, $delay) = @_;
321 323
322 # $delay is not very sensibly implemented by AnyEvent::SNMP, 324 # $delay is not very sensibly implemented by AnyEvent::SNMP,
323 # but apparently it is not a very sensible feature. 325 # but apparently it is not a very sensible feature.
324 if ($delay > 0) { 326 if ($delay > 0) {
325 ++$BUSY; 327 ++$BUSY;
326 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 328 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
327 undef $delay_w; 329 undef $delay_w;
330 push @QUEUE, $pdu;
328 --$BUSY; 331 --$BUSY;
329 push @QUEUE, $pdu;
330 kick_job; 332 kick_job;
331 }); 333 };
332 return 1; 334 return 1;
333 } 335 }
334 336
335 push @QUEUE, $pdu; 337 push @QUEUE, $pdu;
336 kick_job; 338 kick_job;
337 339
338 1 340 1
339} 341}
340 342
341sub activate($) { 343sub activate($) {
342 AnyEvent->one_event while $BUSY; 344 while ($BUSY) {
345 $DONE = AE::cv;
346 $DONE->recv;
347 undef $DONE;
348 }
343} 349}
344 350
345sub one_event($) { 351sub one_event($) {
352 # should not ever be used
346 AnyEvent->one_event; 353 AnyEvent->one_event; #d# todo
347} 354}
348 355
349sub set_max_outstanding($) { 356sub set_max_outstanding($) {
350 $MAX_OUTSTANDING = $_[0]; 357 $MAX_OUTSTANDING = $_[0];
351 kick_job; 358 kick_job;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines