ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.5 by root, Wed Apr 22 12:31:20 2009 UTC vs.
Revision 1.8 by root, Wed Jan 6 10:25:54 2010 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. 3AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::SNMP; 7 use AnyEvent::SNMP;
8 use Net::SNMP; 8 use Net::SNMP;
49 49
50=over 4 50=over 4
51 51
52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) 52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic)
53 53
54=item AnyEvent::SNMP::set_max_outstanding $new_value
55
54Use this package variable to restrict the number of outstanding SNMP 56Use this package variable to restrict the number of outstanding SNMP
55requests at any point in time. 57requests at any point in time.
56 58
57Net::SNMP is very fast at creating and sending SNMP requests, but much 59Net::SNMP is very fast at creating and sending SNMP requests, but much
58slower at parsing (big, bulk) responses. This makes it easy to request a 60slower at parsing (big, bulk) responses. This makes it easy to request a
61In the best case, this can lead to unnecessary delays (and even time-outs, 63In the best case, this can lead to unnecessary delays (and even time-outs,
62as the data has been received but not yet processed) and in the worst 64as the data has been received but not yet processed) and in the worst
63case, this can lead to packet loss, when the receive queue overflows and 65case, this can lead to packet loss, when the receive queue overflows and
64the kernel can no longer accept new packets. 66the kernel can no longer accept new packets.
65 67
66To avoid this, you can (and should) limit the number of outstanding requests 68To avoid this, you can (and should) limit the number of outstanding
67to a number low enough so that parsing time doesn't introduce noticable delays. 69requests to a number low enough so that parsing time doesn't introduce
70noticable delays.
68 71
69Unfortunately, this number depends not only on processing speed and load 72Unfortunately, this number depends not only on processing speed and load
70of the machine running Net::SNMP, but also on the network latency and the 73of the machine running Net::SNMP, but also on the network latency and the
71speed of your SNMP agents. 74speed of your SNMP agents.
72 75
73AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
74and downwards. 77and downwards.
78
79Increasing C<$MAX_OUTSTANDING> will not automatically use the
80extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83
84 AnyEvent::SNMP::set_max_outstanding 500;
85
86Although due to the dynamic adjustment, this might have little lasting
87effect.
75 88
76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses 89Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
77considerably. 90considerably.
78 91
79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>) 92=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
132} 145}
133 146
134use Net::SNMP (); 147use Net::SNMP ();
135use AnyEvent (); 148use AnyEvent ();
136 149
137our $VERSION = '0.2'; 150our $VERSION = '1.0';
138 151
139$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
140 153
141our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
142 155
143# avoid the method call
144my $timer = sub { shift->timer (@_) };
145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
146
147our $BUSY; 156our $BUSY;
157our $DONE; # finished all jobs
148our @TRANSPORT; # fileno => [count, watcher] 158our @TRANSPORT; # fileno => [count, watcher]
149our @QUEUE; 159our @QUEUE;
150our $MAX_OUTSTANDING = 50; 160our $MAX_OUTSTANDING = 50;
151our $MIN_RECVQUEUE = 8; 161our $MIN_RECVQUEUE = 8;
152our $MAX_RECVQUEUE = 64; 162our $MAX_RECVQUEUE = 64;
153 163
154sub kick_job; 164sub kick_job; # also --$BUSY
155 165
156sub _send_pdu { 166sub _send_pdu {
157 my ($pdu, $retries) = @_; 167 my ($pdu, $retries) = @_;
158 168
159 # mostly copied from Net::SNMP::Dispatch 169 # mostly copied from Net::SNMP::Dispatch
161 # Pass the PDU to Message Processing so that it can 171 # Pass the PDU to Message Processing so that it can
162 # create the new outgoing message. 172 # create the new outgoing message.
163 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); 173 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
164 174
165 if (!defined $msg) { 175 if (!defined $msg) {
166 --$BUSY;
167 kick_job; 176 kick_job;
168 # Inform the command generator about the Message Processing error. 177 # Inform the command generator about the Message Processing error.
169 $pdu->status_information ($MESSAGE_PROCESSING->error); 178 $pdu->status_information ($MESSAGE_PROCESSING->error);
170 return; 179 return;
171 } 180 }
175 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 184 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
176 if $pdu->expect_response; 185 if $pdu->expect_response;
177 186
178 # A crude attempt to recover from temporary failures. 187 # A crude attempt to recover from temporary failures.
179 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 188 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
180 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 189 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
181 undef $retry_w; 190 undef $retry_w;
182 _send_pdu ($pdu, $retries); 191 _send_pdu ($pdu, $retries);
183 }); 192 };
184 } else { 193 } else {
185 --$BUSY;
186 kick_job; 194 kick_job;
187 } 195 }
188 196
189 # Inform the command generator about the send() error. 197 # Inform the command generator about the send() error.
190 $pdu->status_information ($msg->error); 198 $pdu->status_information ($msg->error);
196 my $transport = $msg->transport; 204 my $transport = $msg->transport;
197 my $fileno = $transport->fileno; 205 my $fileno = $transport->fileno;
198 206
199 # register the transport 207 # register the transport
200 unless ($TRANSPORT[$fileno][0]++) { 208 unless ($TRANSPORT[$fileno][0]++) {
201 $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 209 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
202 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 210 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
203 # Create a new Message object to receive the response 211 # Create a new Message object to receive the response
204 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 212 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
205 213
206 if (!defined $msg) { 214 if (!defined $msg) {
247 # Cancel the timeout. 255 # Cancel the timeout.
248 my $rtimeout_w = $msg->timeout_id; 256 my $rtimeout_w = $msg->timeout_id;
249 if ($$rtimeout_w) { 257 if ($$rtimeout_w) {
250 undef $$rtimeout_w; 258 undef $$rtimeout_w;
251 259
252 --$BUSY;
253 kick_job; 260 kick_job;
254 261
255 unless (--$TRANSPORT[$fileno][0]) { 262 unless (--$TRANSPORT[$fileno][0]) {
256 delete $TRANSPORT[$fileno]; 263 delete $TRANSPORT[$fileno];
257 return; 264 return;
261 268
262 # when we end up here, we successfully handled $MAX_RECVQUEUE 269 # when we end up here, we successfully handled $MAX_RECVQUEUE
263 # replies in one iteration, so assume we are overloaded 270 # replies in one iteration, so assume we are overloaded
264 # and reduce the amount of parallelity. 271 # and reduce the amount of parallelity.
265 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; 272 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
266 }); 273 };
267 } 274 }
268 275
269 $msg->timeout_id (\(my $rtimeout_w = 276 $msg->timeout_id (\(my $rtimeout_w =
270 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 277 AE::timer $pdu->timeout, 0, sub {
271 my $rtimeout_w = $msg->timeout_id; 278 my $rtimeout_w = $msg->timeout_id;
272 if ($$rtimeout_w) { 279 if ($$rtimeout_w) {
273 undef $$rtimeout_w; 280 undef $$rtimeout_w;
274 delete $TRANSPORT[$fileno] 281 delete $TRANSPORT[$fileno]
275 unless --$TRANSPORT[$fileno][0]; 282 unless --$TRANSPORT[$fileno][0];
279 _send_pdu ($pdu, $retries); 286 _send_pdu ($pdu, $retries);
280 } else { 287 } else {
281 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); 288 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
282 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); 289 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
283 290
284 --$BUSY;
285 kick_job; 291 kick_job;
286 } 292 }
287 }) 293 })
288 )); 294 );
289 } else { 295 } else {
290 --$BUSY;
291 kick_job; 296 kick_job;
292 } 297 }
293} 298}
294 299
295sub kick_job { 300sub kick_job {
301 --$BUSY;
302
296 while ($BUSY < $MAX_OUTSTANDING) { 303 while ($BUSY < $MAX_OUTSTANDING) {
297 my $pdu = shift @QUEUE 304 my $pdu = shift @QUEUE
298 or last; 305 or last;
299 306
300 ++$BUSY; 307 ++$BUSY;
301 308
302 _send_pdu $pdu, $pdu->retries; 309 _send_pdu $pdu, $pdu->retries;
303 } 310 }
311
312 $DONE and $DONE->() unless $BUSY;
304} 313}
314
305sub send_pdu($$$) { 315sub send_pdu($$$) {
306 my (undef, $pdu, $delay) = @_; 316 my (undef, $pdu, $delay) = @_;
307 317
308 # $delay is not very sensibly implemented by AnyEvent::SNMP, 318 # $delay is not very sensibly implemented by AnyEvent::SNMP,
309 # but apparently it is not a very sensible feature. 319 # but apparently it is not a very sensible feature.
310 if ($delay > 0) { 320 if ($delay > 0) {
311 ++$BUSY; 321 ++$BUSY;
312 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 322 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
313 undef $delay_w; 323 undef $delay_w;
314 --$BUSY;
315 push @QUEUE, $pdu; 324 push @QUEUE, $pdu;
316 kick_job; 325 kick_job;
317 }); 326 };
318 return 1; 327 return 1;
319 } 328 }
320 329
321 push @QUEUE, $pdu; 330 push @QUEUE, $pdu;
322 kick_job; 331 kick_job;
323 332
324 1 333 1
325} 334}
326 335
327sub activate($) { 336sub activate($) {
328 AnyEvent->one_event while $BUSY; 337 while ($BUSY) {
338 $DONE = AE::cv;
339 $DONE->recv;
340 undef $DONE;
341 }
329} 342}
330 343
331sub one_event($) { 344sub one_event($) {
332 AnyEvent->one_event; 345 AnyEvent->one_event; #d# todo
346}
347
348sub set_max_outstanding($) {
349 $MAX_OUTSTANDING = $_[0];
350
351 ++$BUSY; # kick_job decrements $BUSY
352 kick_job;
333} 353}
334 354
335=head1 SEE ALSO 355=head1 SEE ALSO
336 356
337L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. 357L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines