--- cvsroot/AnyEvent-SNMP/SNMP.pm 2009/04/25 12:20:50 1.6 +++ cvsroot/AnyEvent-SNMP/SNMP.pm 2010/01/06 10:25:54 1.8 @@ -1,6 +1,6 @@ =head1 NAME -AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. +AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent. =head1 SYNOPSIS @@ -77,8 +77,8 @@ and downwards. Increasing C<$MAX_OUTSTANDING> will not automatically use the -C and make -C make use of the extra paralellity, call +extra request slots. To increase C<$MAX_OUTSTANDING> and make +C make use of the extra paralellity, call C with the new value, e.g.: AnyEvent::SNMP::set_max_outstanding 500; @@ -147,24 +147,21 @@ use Net::SNMP (); use AnyEvent (); -our $VERSION = '0.2'; +our $VERSION = '1.0'; $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; -# avoid the method call -my $timer = sub { shift->timer (@_) }; -AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; - our $BUSY; +our $DONE; # finished all jobs our @TRANSPORT; # fileno => [count, watcher] our @QUEUE; our $MAX_OUTSTANDING = 50; our $MIN_RECVQUEUE = 8; our $MAX_RECVQUEUE = 64; -sub kick_job; +sub kick_job; # also --$BUSY sub _send_pdu { my ($pdu, $retries) = @_; @@ -176,7 +173,6 @@ my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); if (!defined $msg) { - --$BUSY; kick_job; # Inform the command generator about the Message Processing error. $pdu->status_information ($MESSAGE_PROCESSING->error); @@ -190,12 +186,11 @@ # A crude attempt to recover from temporary failures. if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { - my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { + my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub { undef $retry_w; _send_pdu ($pdu, $retries); - }); + }; } else { - --$BUSY; kick_job; } @@ -211,7 +206,7 @@ # register the transport unless ($TRANSPORT[$fileno][0]++) { - $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { + $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub { for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go # Create a new Message object to receive the response my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); @@ -262,7 +257,6 @@ if ($$rtimeout_w) { undef $$rtimeout_w; - --$BUSY; kick_job; unless (--$TRANSPORT[$fileno][0]) { @@ -276,11 +270,11 @@ # replies in one iteration, so assume we are overloaded # and reduce the amount of parallelity. $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; - }); + }; } $msg->timeout_id (\(my $rtimeout_w = - AnyEvent->$timer (after => $pdu->timeout, cb => sub { + AE::timer $pdu->timeout, 0, sub { my $rtimeout_w = $msg->timeout_id; if ($$rtimeout_w) { undef $$rtimeout_w; @@ -294,18 +288,18 @@ $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); - --$BUSY; kick_job; } }) - )); + ); } else { - --$BUSY; kick_job; } } sub kick_job { + --$BUSY; + while ($BUSY < $MAX_OUTSTANDING) { my $pdu = shift @QUEUE or last; @@ -314,6 +308,8 @@ _send_pdu $pdu, $pdu->retries; } + + $DONE and $DONE->() unless $BUSY; } sub send_pdu($$$) { @@ -323,12 +319,11 @@ # but apparently it is not a very sensible feature. if ($delay > 0) { ++$BUSY; - my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { + my $delay_w; $delay_w = AE::timer $delay, 0, sub { undef $delay_w; - --$BUSY; push @QUEUE, $pdu; kick_job; - }); + }; return 1; } @@ -339,15 +334,21 @@ } sub activate($) { - AnyEvent->one_event while $BUSY; + while ($BUSY) { + $DONE = AE::cv; + $DONE->recv; + undef $DONE; + } } sub one_event($) { - AnyEvent->one_event; + AnyEvent->one_event; #d# todo } sub set_max_outstanding($) { $MAX_OUTSTANDING = $_[0]; + + ++$BUSY; # kick_job decrements $BUSY kick_job; }