ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.3 by root, Sat Apr 18 10:17:53 2009 UTC vs.
Revision 1.5 by root, Wed Apr 22 12:31:20 2009 UTC

74and downwards. 74and downwards.
75 75
76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses 76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
77considerably. 77considerably.
78 78
79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) 79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
80 80
81=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) 81=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>)
82 82
83These values specify the minimum and maximum receive queue length (in 83These values specify the minimum and maximum receive queue length (in
84units of one response packet). 84units of one response packet).
143# avoid the method call 143# avoid the method call
144my $timer = sub { shift->timer (@_) }; 144my $timer = sub { shift->timer (@_) };
145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; 145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
146 146
147our $BUSY; 147our $BUSY;
148our %TRANSPORT; # address => [count, watcher] 148our @TRANSPORT; # fileno => [count, watcher]
149our @QUEUE; 149our @QUEUE;
150our $MAX_OUTSTANDING = 50; 150our $MAX_OUTSTANDING = 50;
151our $MIN_RECVQUEUE = 4; 151our $MIN_RECVQUEUE = 8;
152our $MAX_RECVQUEUE = 64; 152our $MAX_RECVQUEUE = 64;
153 153
154sub kick_job; 154sub kick_job;
155 155
156sub _send_pdu { 156sub _send_pdu {
192 } 192 }
193 193
194 # Schedule the timeout handler if the message expects a response. 194 # Schedule the timeout handler if the message expects a response.
195 if ($pdu->expect_response) { 195 if ($pdu->expect_response) {
196 my $transport = $msg->transport; 196 my $transport = $msg->transport;
197 my $fileno = $transport->fileno;
197 198
198 # register the transport 199 # register the transport
199 unless ($TRANSPORT{$transport+0}[0]++) { 200 unless ($TRANSPORT[$fileno][0]++) {
200 $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 201 $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub {
201 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 202 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
202 # Create a new Message object to receive the response 203 # Create a new Message object to receive the response
203 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 204 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
204 205
205 if (!defined $msg) { 206 if (!defined $msg) {
207 } 208 }
208 209
209 # Read the message from the Transport Layer 210 # Read the message from the Transport Layer
210 if (!defined $msg->recv) { 211 if (!defined $msg->recv) {
211 if ($transport->connectionless) { 212 if ($transport->connectionless) {
213 # if we handled very few replies and we have queued work, try
214 # to increase the parallelity as we probably can handle more.
212 if ($count < $MIN_RECVQUEUE && @QUEUE) { 215 if ($count < $MIN_RECVQUEUE && @QUEUE) {
213 ++$MAX_OUTSTANDING; 216 ++$MAX_OUTSTANDING;
214 kick_job; 217 kick_job;
215 } 218 }
216 } else { 219 } else {
217 # for some reason, connected-oriented transports seem to need this 220 # for some reason, connected-oriented transports seem to need this
218 delete $TRANSPORT{$transport+0} 221 delete $TRANSPORT[$fileno]
219 unless --$TRANSPORT{$transport+0}[0]; 222 unless --$TRANSPORT[$fileno][0];
220 } 223 }
221 224
222 $msg->error; 225 $msg->error;
223 return; 226 return;
224 } 227 }
247 undef $$rtimeout_w; 250 undef $$rtimeout_w;
248 251
249 --$BUSY; 252 --$BUSY;
250 kick_job; 253 kick_job;
251 254
252 unless (--$TRANSPORT{$transport+0}[0]) { 255 unless (--$TRANSPORT[$fileno][0]) {
253 delete $TRANSPORT{$transport+0}; 256 delete $TRANSPORT[$fileno];
254 return; 257 return;
255 } 258 }
256 } 259 }
257 } 260 }
258 261
262 # when we end up here, we successfully handled $MAX_RECVQUEUE
263 # replies in one iteration, so assume we are overloaded
264 # and reduce the amount of parallelity.
259 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; 265 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
260 }); 266 });
261 } 267 }
262 268
263 $msg->timeout_id (\(my $rtimeout_w = 269 $msg->timeout_id (\(my $rtimeout_w =
264 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 270 AnyEvent->$timer (after => $pdu->timeout, cb => sub {
265 my $rtimeout_w = $msg->timeout_id; 271 my $rtimeout_w = $msg->timeout_id;
266 if ($$rtimeout_w) { 272 if ($$rtimeout_w) {
267 undef $$rtimeout_w; 273 undef $$rtimeout_w;
268 delete $TRANSPORT{$transport+0} 274 delete $TRANSPORT[$fileno]
269 unless --$TRANSPORT{$transport+0}[0]; 275 unless --$TRANSPORT[$fileno][0];
270 } 276 }
271 277
272 if ($retries--) { 278 if ($retries--) {
273 _send_pdu ($pdu, $retries); 279 _send_pdu ($pdu, $retries);
274 } else { 280 } else {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines