ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.7 by root, Sun Aug 2 14:24:23 2009 UTC vs.
Revision 1.9 by root, Wed Jan 6 10:43:20 2010 UTC

75 75
76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
77and downwards. 77and downwards.
78 78
79Increasing C<$MAX_OUTSTANDING> will not automatically use the 79Increasing C<$MAX_OUTSTANDING> will not automatically use the
80C<extra request slots. To increase $MAX_OUTSTANDING> and make 80extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<C<AnyEvent::SNMP> make use of the extra paralellity, call 81C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: 82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83 83
84 AnyEvent::SNMP::set_max_outstanding 500; 84 AnyEvent::SNMP::set_max_outstanding 500;
85 85
86Although due to the dynamic adjustment, this might have little lasting 86Although due to the dynamic adjustment, this might have little lasting
145} 145}
146 146
147use Net::SNMP (); 147use Net::SNMP ();
148use AnyEvent (); 148use AnyEvent ();
149 149
150our $VERSION = '0.2'; 150our $VERSION = '1.0';
151 151
152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
153 153
154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
155 155
156# avoid the method call
157my $timer = sub { shift->timer (@_) };
158AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
159
160our $BUSY; 156our $BUSY;
157our $DONE; # finished all jobs
161our @TRANSPORT; # fileno => [count, watcher] 158our @TRANSPORT; # fileno => [count, watcher]
162our @QUEUE; 159our @QUEUE;
163our $MAX_OUTSTANDING = 50; 160our $MAX_OUTSTANDING = 50;
164our $MIN_RECVQUEUE = 8; 161our $MIN_RECVQUEUE = 8;
165our $MAX_RECVQUEUE = 64; 162our $MAX_RECVQUEUE = 64;
188 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 185 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
189 if $pdu->expect_response; 186 if $pdu->expect_response;
190 187
191 # A crude attempt to recover from temporary failures. 188 # A crude attempt to recover from temporary failures.
192 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 189 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
193 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 190 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
194 undef $retry_w; 191 undef $retry_w;
195 _send_pdu ($pdu, $retries); 192 _send_pdu ($pdu, $retries);
196 }); 193 };
197 } else { 194 } else {
198 --$BUSY; 195 --$BUSY;
199 kick_job; 196 kick_job;
200 } 197 }
201 198
209 my $transport = $msg->transport; 206 my $transport = $msg->transport;
210 my $fileno = $transport->fileno; 207 my $fileno = $transport->fileno;
211 208
212 # register the transport 209 # register the transport
213 unless ($TRANSPORT[$fileno][0]++) { 210 unless ($TRANSPORT[$fileno][0]++) {
214 $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 211 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
215 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 212 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
216 # Create a new Message object to receive the response 213 # Create a new Message object to receive the response
217 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 214 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
218 215
219 if (!defined $msg) { 216 if (!defined $msg) {
274 271
275 # when we end up here, we successfully handled $MAX_RECVQUEUE 272 # when we end up here, we successfully handled $MAX_RECVQUEUE
276 # replies in one iteration, so assume we are overloaded 273 # replies in one iteration, so assume we are overloaded
277 # and reduce the amount of parallelity. 274 # and reduce the amount of parallelity.
278 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; 275 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
279 }); 276 };
280 } 277 }
281 278
282 $msg->timeout_id (\(my $rtimeout_w = 279 $msg->timeout_id (\(my $rtimeout_w =
283 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 280 AE::timer $pdu->timeout, 0, sub {
284 my $rtimeout_w = $msg->timeout_id; 281 my $rtimeout_w = $msg->timeout_id;
285 if ($$rtimeout_w) { 282 if ($$rtimeout_w) {
286 undef $$rtimeout_w; 283 undef $$rtimeout_w;
287 delete $TRANSPORT[$fileno] 284 delete $TRANSPORT[$fileno]
288 unless --$TRANSPORT[$fileno][0]; 285 unless --$TRANSPORT[$fileno][0];
296 293
297 --$BUSY; 294 --$BUSY;
298 kick_job; 295 kick_job;
299 } 296 }
300 }) 297 })
301 )); 298 );
302 } else { 299 } else {
303 --$BUSY; 300 --$BUSY;
304 kick_job; 301 kick_job;
305 } 302 }
306} 303}
309 while ($BUSY < $MAX_OUTSTANDING) { 306 while ($BUSY < $MAX_OUTSTANDING) {
310 my $pdu = shift @QUEUE 307 my $pdu = shift @QUEUE
311 or last; 308 or last;
312 309
313 ++$BUSY; 310 ++$BUSY;
314
315 _send_pdu $pdu, $pdu->retries; 311 _send_pdu $pdu, $pdu->retries;
316 } 312 }
313
314 $DONE and $DONE->() unless $BUSY;
317} 315}
318 316
319sub send_pdu($$$) { 317sub send_pdu($$$) {
320 my (undef, $pdu, $delay) = @_; 318 my (undef, $pdu, $delay) = @_;
321 319
322 # $delay is not very sensibly implemented by AnyEvent::SNMP, 320 # $delay is not very sensibly implemented by AnyEvent::SNMP,
323 # but apparently it is not a very sensible feature. 321 # but apparently it is not a very sensible feature.
324 if ($delay > 0) { 322 if ($delay > 0) {
325 ++$BUSY; 323 ++$BUSY;
326 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 324 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
327 undef $delay_w; 325 undef $delay_w;
326 push @QUEUE, $pdu;
328 --$BUSY; 327 --$BUSY;
329 push @QUEUE, $pdu;
330 kick_job; 328 kick_job;
331 }); 329 };
332 return 1; 330 return 1;
333 } 331 }
334 332
335 push @QUEUE, $pdu; 333 push @QUEUE, $pdu;
336 kick_job; 334 kick_job;
337 335
338 1 336 1
339} 337}
340 338
341sub activate($) { 339sub activate($) {
342 AnyEvent->one_event while $BUSY; 340 while ($BUSY) {
341 $DONE = AE::cv;
342 $DONE->recv;
343 undef $DONE;
344 }
343} 345}
344 346
345sub one_event($) { 347sub one_event($) {
348 # should not ever be used
346 AnyEvent->one_event; 349 AnyEvent->one_event; #d# todo
347} 350}
348 351
349sub set_max_outstanding($) { 352sub set_max_outstanding($) {
350 $MAX_OUTSTANDING = $_[0]; 353 $MAX_OUTSTANDING = $_[0];
351 kick_job; 354 kick_job;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines