--- AnyEvent-SNMP/SNMP.pm 2009/04/18 10:17:53 1.3 +++ AnyEvent-SNMP/SNMP.pm 2010/10/31 18:26:27 1.11 @@ -1,6 +1,6 @@ =head1 NAME -AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. +AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent. =head1 SYNOPSIS @@ -25,20 +25,24 @@ =head1 DESCRIPTION This module implements an alternative "event dispatcher" for Net::SNMP, -using AnyEvent as a backend. - -This integrates Net::SNMP into AnyEvent: You can make non-blocking -Net::SNMP calls and as long as other parts of your program also use -AnyEvent (or some event loop supported by AnyEvent), they will run in -parallel. +using AnyEvent as a backend. This integrates Net::SNMP into AnyEvent. That +means you can make non-blocking Net::SNMP calls and as long as other +parts of your program also use AnyEvent (or some event loop supported by +AnyEvent), they will run in parallel. Also, the Net::SNMP scheduler is very inefficient with respect to both CPU and memory usage. Most AnyEvent backends (including the pure-perl backend) fare much better than the Net::SNMP dispatcher. -A potential disadvantage is that replacing the dispatcher is not at all -a documented thing to do, so future changes in Net::SNP might break this -module (or the many similar ones). +Another major added fetaure of this module over Net::SNMP is automatic +rate-adjustments: Net::SNMP is so slow that firing a few thousand +requests can cause many timeouts simply because Net::SNMP cannot process +the replies in time. This module automatically adapts the send rate to +avoid false timeouts caused by slow reply processing. + +A potential disadvantage of this module is that replacing the dispatcher +is not at all a documented thing to do, so future changes in Net::SNP +might break this module (or the many similar ones). This module does not export anything and does not require you to do anything special apart from loading it I, dynamic) +=item AnyEvent::SNMP::set_max_outstanding $new_value + Use this package variable to restrict the number of outstanding SNMP requests at any point in time. @@ -63,20 +69,31 @@ case, this can lead to packet loss, when the receive queue overflows and the kernel can no longer accept new packets. -To avoid this, you can (and should) limit the number of outstanding requests -to a number low enough so that parsing time doesn't introduce noticable delays. +To avoid this, you can (and should) limit the number of outstanding +requests to a number low enough so that parsing time doesn't introduce +noticable delays. Unfortunately, this number depends not only on processing speed and load of the machine running Net::SNMP, but also on the network latency and the speed of your SNMP agents. -AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards -and downwards. +AnyEvent::SNMP tries to dynamically adjust this number upwards and +downwards. + +Increasing C<$MAX_OUTSTANDING> will not automatically use the +extra request slots. To increase C<$MAX_OUTSTANDING> and make +C make use of the extra paralellity, call +C with the new value, e.g.: + + AnyEvent::SNMP::set_max_outstanding 500; + +Although due to the dynamic adjustment, this might have little lasting +effect. Note that you can use L to speed up parsing of responses considerably. -=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) +=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>) =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) @@ -120,8 +137,7 @@ package AnyEvent::SNMP; -no warnings; -use strict qw(subs vars); +use common::sense; # it is possible to do this without loading # Net::SNMP::Dispatcher, but much more awkward. @@ -134,21 +150,18 @@ use Net::SNMP (); use AnyEvent (); -our $VERSION = '0.2'; +our $VERSION = '1.0'; $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; -# avoid the method call -my $timer = sub { shift->timer (@_) }; -AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; - our $BUSY; -our %TRANSPORT; # address => [count, watcher] +our $DONE; # finished all jobs +our @TRANSPORT; # fileno => [count, watcher] our @QUEUE; our $MAX_OUTSTANDING = 50; -our $MIN_RECVQUEUE = 4; +our $MIN_RECVQUEUE = 8; our $MAX_RECVQUEUE = 64; sub kick_job; @@ -177,10 +190,10 @@ # A crude attempt to recover from temporary failures. if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { - my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { + my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub { undef $retry_w; _send_pdu ($pdu, $retries); - }); + }; } else { --$BUSY; kick_job; @@ -194,10 +207,11 @@ # Schedule the timeout handler if the message expects a response. if ($pdu->expect_response) { my $transport = $msg->transport; + my $fileno = $transport->fileno; # register the transport - unless ($TRANSPORT{$transport+0}[0]++) { - $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { + unless ($TRANSPORT[$fileno][0]++) { + $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub { for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go # Create a new Message object to receive the response my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); @@ -209,14 +223,16 @@ # Read the message from the Transport Layer if (!defined $msg->recv) { if ($transport->connectionless) { + # if we handled very few replies and we have queued work, try + # to increase the parallelity as we probably can handle more. if ($count < $MIN_RECVQUEUE && @QUEUE) { ++$MAX_OUTSTANDING; kick_job; } } else { # for some reason, connected-oriented transports seem to need this - delete $TRANSPORT{$transport+0} - unless --$TRANSPORT{$transport+0}[0]; + delete $TRANSPORT[$fileno] + unless --$TRANSPORT[$fileno][0]; } $msg->error; @@ -249,24 +265,27 @@ --$BUSY; kick_job; - unless (--$TRANSPORT{$transport+0}[0]) { - delete $TRANSPORT{$transport+0}; + unless (--$TRANSPORT[$fileno][0]) { + delete $TRANSPORT[$fileno]; return; } } } - $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; - }); + # when we end up here, we successfully handled $MAX_RECVQUEUE + # replies in one iteration, so assume we are overloaded + # and reduce the amount of parallelity. + $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; + }; } $msg->timeout_id (\(my $rtimeout_w = - AnyEvent->$timer (after => $pdu->timeout, cb => sub { + AE::timer $pdu->timeout, 0, sub { my $rtimeout_w = $msg->timeout_id; if ($$rtimeout_w) { undef $$rtimeout_w; - delete $TRANSPORT{$transport+0} - unless --$TRANSPORT{$transport+0}[0]; + delete $TRANSPORT[$fileno] + unless --$TRANSPORT[$fileno][0]; } if ($retries--) { @@ -279,7 +298,7 @@ kick_job; } }) - )); + ); } else { --$BUSY; kick_job; @@ -292,10 +311,12 @@ or last; ++$BUSY; - _send_pdu $pdu, $pdu->retries; } + + $DONE and $DONE->() unless $BUSY; } + sub send_pdu($$$) { my (undef, $pdu, $delay) = @_; @@ -303,12 +324,12 @@ # but apparently it is not a very sensible feature. if ($delay > 0) { ++$BUSY; - my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { + my $delay_w; $delay_w = AE::timer $delay, 0, sub { undef $delay_w; - --$BUSY; push @QUEUE, $pdu; + --$BUSY; kick_job; - }); + }; return 1; } @@ -319,11 +340,21 @@ } sub activate($) { - AnyEvent->one_event while $BUSY; + while ($BUSY) { + $DONE = AE::cv; + $DONE->recv; + undef $DONE; + } } sub one_event($) { - AnyEvent->one_event; + # should not ever be used + AnyEvent->one_event; #d# todo +} + +sub set_max_outstanding($) { + $MAX_OUTSTANDING = $_[0]; + kick_job; } =head1 SEE ALSO