ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing cvsroot/AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.6 by root, Sat Apr 25 12:20:50 2009 UTC vs.
Revision 1.8 by root, Wed Jan 6 10:25:54 2010 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. 3AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::SNMP; 7 use AnyEvent::SNMP;
8 use Net::SNMP; 8 use Net::SNMP;
75 75
76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
77and downwards. 77and downwards.
78 78
79Increasing C<$MAX_OUTSTANDING> will not automatically use the 79Increasing C<$MAX_OUTSTANDING> will not automatically use the
80C<extra request slots. To increase $MAX_OUTSTANDING> and make 80extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<C<AnyEvent::SNMP> make use of the extra paralellity, call 81C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: 82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83 83
84 AnyEvent::SNMP::set_max_outstanding 500; 84 AnyEvent::SNMP::set_max_outstanding 500;
85 85
86Although due to the dynamic adjustment, this might have little lasting 86Although due to the dynamic adjustment, this might have little lasting
145} 145}
146 146
147use Net::SNMP (); 147use Net::SNMP ();
148use AnyEvent (); 148use AnyEvent ();
149 149
150our $VERSION = '0.2'; 150our $VERSION = '1.0';
151 151
152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
153 153
154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
155 155
156# avoid the method call
157my $timer = sub { shift->timer (@_) };
158AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
159
160our $BUSY; 156our $BUSY;
157our $DONE; # finished all jobs
161our @TRANSPORT; # fileno => [count, watcher] 158our @TRANSPORT; # fileno => [count, watcher]
162our @QUEUE; 159our @QUEUE;
163our $MAX_OUTSTANDING = 50; 160our $MAX_OUTSTANDING = 50;
164our $MIN_RECVQUEUE = 8; 161our $MIN_RECVQUEUE = 8;
165our $MAX_RECVQUEUE = 64; 162our $MAX_RECVQUEUE = 64;
166 163
167sub kick_job; 164sub kick_job; # also --$BUSY
168 165
169sub _send_pdu { 166sub _send_pdu {
170 my ($pdu, $retries) = @_; 167 my ($pdu, $retries) = @_;
171 168
172 # mostly copied from Net::SNMP::Dispatch 169 # mostly copied from Net::SNMP::Dispatch
174 # Pass the PDU to Message Processing so that it can 171 # Pass the PDU to Message Processing so that it can
175 # create the new outgoing message. 172 # create the new outgoing message.
176 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); 173 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
177 174
178 if (!defined $msg) { 175 if (!defined $msg) {
179 --$BUSY;
180 kick_job; 176 kick_job;
181 # Inform the command generator about the Message Processing error. 177 # Inform the command generator about the Message Processing error.
182 $pdu->status_information ($MESSAGE_PROCESSING->error); 178 $pdu->status_information ($MESSAGE_PROCESSING->error);
183 return; 179 return;
184 } 180 }
188 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 184 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
189 if $pdu->expect_response; 185 if $pdu->expect_response;
190 186
191 # A crude attempt to recover from temporary failures. 187 # A crude attempt to recover from temporary failures.
192 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 188 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
193 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 189 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
194 undef $retry_w; 190 undef $retry_w;
195 _send_pdu ($pdu, $retries); 191 _send_pdu ($pdu, $retries);
196 }); 192 };
197 } else { 193 } else {
198 --$BUSY;
199 kick_job; 194 kick_job;
200 } 195 }
201 196
202 # Inform the command generator about the send() error. 197 # Inform the command generator about the send() error.
203 $pdu->status_information ($msg->error); 198 $pdu->status_information ($msg->error);
209 my $transport = $msg->transport; 204 my $transport = $msg->transport;
210 my $fileno = $transport->fileno; 205 my $fileno = $transport->fileno;
211 206
212 # register the transport 207 # register the transport
213 unless ($TRANSPORT[$fileno][0]++) { 208 unless ($TRANSPORT[$fileno][0]++) {
214 $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 209 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
215 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 210 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
216 # Create a new Message object to receive the response 211 # Create a new Message object to receive the response
217 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 212 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
218 213
219 if (!defined $msg) { 214 if (!defined $msg) {
260 # Cancel the timeout. 255 # Cancel the timeout.
261 my $rtimeout_w = $msg->timeout_id; 256 my $rtimeout_w = $msg->timeout_id;
262 if ($$rtimeout_w) { 257 if ($$rtimeout_w) {
263 undef $$rtimeout_w; 258 undef $$rtimeout_w;
264 259
265 --$BUSY;
266 kick_job; 260 kick_job;
267 261
268 unless (--$TRANSPORT[$fileno][0]) { 262 unless (--$TRANSPORT[$fileno][0]) {
269 delete $TRANSPORT[$fileno]; 263 delete $TRANSPORT[$fileno];
270 return; 264 return;
274 268
275 # when we end up here, we successfully handled $MAX_RECVQUEUE 269 # when we end up here, we successfully handled $MAX_RECVQUEUE
276 # replies in one iteration, so assume we are overloaded 270 # replies in one iteration, so assume we are overloaded
277 # and reduce the amount of parallelity. 271 # and reduce the amount of parallelity.
278 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; 272 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
279 }); 273 };
280 } 274 }
281 275
282 $msg->timeout_id (\(my $rtimeout_w = 276 $msg->timeout_id (\(my $rtimeout_w =
283 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 277 AE::timer $pdu->timeout, 0, sub {
284 my $rtimeout_w = $msg->timeout_id; 278 my $rtimeout_w = $msg->timeout_id;
285 if ($$rtimeout_w) { 279 if ($$rtimeout_w) {
286 undef $$rtimeout_w; 280 undef $$rtimeout_w;
287 delete $TRANSPORT[$fileno] 281 delete $TRANSPORT[$fileno]
288 unless --$TRANSPORT[$fileno][0]; 282 unless --$TRANSPORT[$fileno][0];
292 _send_pdu ($pdu, $retries); 286 _send_pdu ($pdu, $retries);
293 } else { 287 } else {
294 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); 288 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
295 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); 289 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
296 290
297 --$BUSY;
298 kick_job; 291 kick_job;
299 } 292 }
300 }) 293 })
301 )); 294 );
302 } else { 295 } else {
303 --$BUSY;
304 kick_job; 296 kick_job;
305 } 297 }
306} 298}
307 299
308sub kick_job { 300sub kick_job {
301 --$BUSY;
302
309 while ($BUSY < $MAX_OUTSTANDING) { 303 while ($BUSY < $MAX_OUTSTANDING) {
310 my $pdu = shift @QUEUE 304 my $pdu = shift @QUEUE
311 or last; 305 or last;
312 306
313 ++$BUSY; 307 ++$BUSY;
314 308
315 _send_pdu $pdu, $pdu->retries; 309 _send_pdu $pdu, $pdu->retries;
316 } 310 }
311
312 $DONE and $DONE->() unless $BUSY;
317} 313}
318 314
319sub send_pdu($$$) { 315sub send_pdu($$$) {
320 my (undef, $pdu, $delay) = @_; 316 my (undef, $pdu, $delay) = @_;
321 317
322 # $delay is not very sensibly implemented by AnyEvent::SNMP, 318 # $delay is not very sensibly implemented by AnyEvent::SNMP,
323 # but apparently it is not a very sensible feature. 319 # but apparently it is not a very sensible feature.
324 if ($delay > 0) { 320 if ($delay > 0) {
325 ++$BUSY; 321 ++$BUSY;
326 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 322 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
327 undef $delay_w; 323 undef $delay_w;
328 --$BUSY;
329 push @QUEUE, $pdu; 324 push @QUEUE, $pdu;
330 kick_job; 325 kick_job;
331 }); 326 };
332 return 1; 327 return 1;
333 } 328 }
334 329
335 push @QUEUE, $pdu; 330 push @QUEUE, $pdu;
336 kick_job; 331 kick_job;
337 332
338 1 333 1
339} 334}
340 335
341sub activate($) { 336sub activate($) {
342 AnyEvent->one_event while $BUSY; 337 while ($BUSY) {
338 $DONE = AE::cv;
339 $DONE->recv;
340 undef $DONE;
341 }
343} 342}
344 343
345sub one_event($) { 344sub one_event($) {
346 AnyEvent->one_event; 345 AnyEvent->one_event; #d# todo
347} 346}
348 347
349sub set_max_outstanding($) { 348sub set_max_outstanding($) {
350 $MAX_OUTSTANDING = $_[0]; 349 $MAX_OUTSTANDING = $_[0];
350
351 ++$BUSY; # kick_job decrements $BUSY
351 kick_job; 352 kick_job;
352} 353}
353 354
354=head1 SEE ALSO 355=head1 SEE ALSO
355 356

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines