ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.3 by root, Sat Apr 18 10:17:53 2009 UTC vs.
Revision 1.11 by root, Sun Oct 31 18:26:27 2010 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. 3AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::SNMP; 7 use AnyEvent::SNMP;
8 use Net::SNMP; 8 use Net::SNMP;
23 my @result = $cv->wait; 23 my @result = $cv->wait;
24 24
25=head1 DESCRIPTION 25=head1 DESCRIPTION
26 26
27This module implements an alternative "event dispatcher" for Net::SNMP, 27This module implements an alternative "event dispatcher" for Net::SNMP,
28using AnyEvent as a backend. 28using AnyEvent as a backend. This integrates Net::SNMP into AnyEvent. That
29 29means you can make non-blocking Net::SNMP calls and as long as other
30This integrates Net::SNMP into AnyEvent: You can make non-blocking 30parts of your program also use AnyEvent (or some event loop supported by
31Net::SNMP calls and as long as other parts of your program also use 31AnyEvent), they will run in parallel.
32AnyEvent (or some event loop supported by AnyEvent), they will run in
33parallel.
34 32
35Also, the Net::SNMP scheduler is very inefficient with respect to both CPU 33Also, the Net::SNMP scheduler is very inefficient with respect to both CPU
36and memory usage. Most AnyEvent backends (including the pure-perl backend) 34and memory usage. Most AnyEvent backends (including the pure-perl backend)
37fare much better than the Net::SNMP dispatcher. 35fare much better than the Net::SNMP dispatcher.
38 36
37Another major added fetaure of this module over Net::SNMP is automatic
38rate-adjustments: Net::SNMP is so slow that firing a few thousand
39requests can cause many timeouts simply because Net::SNMP cannot process
40the replies in time. This module automatically adapts the send rate to
41avoid false timeouts caused by slow reply processing.
42
39A potential disadvantage is that replacing the dispatcher is not at all 43A potential disadvantage of this module is that replacing the dispatcher
40a documented thing to do, so future changes in Net::SNP might break this 44is not at all a documented thing to do, so future changes in Net::SNP
41module (or the many similar ones). 45might break this module (or the many similar ones).
42 46
43This module does not export anything and does not require you to do 47This module does not export anything and does not require you to do
44anything special apart from loading it I<before doing any non-blocking 48anything special apart from loading it I<before doing any non-blocking
45requests with Net::SNMP>. It is recommended but not required to load this 49requests with Net::SNMP>. It is recommended but not required to load this
46module before C<Net::SNMP>. 50module before C<Net::SNMP>.
49 53
50=over 4 54=over 4
51 55
52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) 56=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic)
53 57
58=item AnyEvent::SNMP::set_max_outstanding $new_value
59
54Use this package variable to restrict the number of outstanding SNMP 60Use this package variable to restrict the number of outstanding SNMP
55requests at any point in time. 61requests at any point in time.
56 62
57Net::SNMP is very fast at creating and sending SNMP requests, but much 63Net::SNMP is very fast at creating and sending SNMP requests, but much
58slower at parsing (big, bulk) responses. This makes it easy to request a 64slower at parsing (big, bulk) responses. This makes it easy to request a
61In the best case, this can lead to unnecessary delays (and even time-outs, 67In the best case, this can lead to unnecessary delays (and even time-outs,
62as the data has been received but not yet processed) and in the worst 68as the data has been received but not yet processed) and in the worst
63case, this can lead to packet loss, when the receive queue overflows and 69case, this can lead to packet loss, when the receive queue overflows and
64the kernel can no longer accept new packets. 70the kernel can no longer accept new packets.
65 71
66To avoid this, you can (and should) limit the number of outstanding requests 72To avoid this, you can (and should) limit the number of outstanding
67to a number low enough so that parsing time doesn't introduce noticable delays. 73requests to a number low enough so that parsing time doesn't introduce
74noticable delays.
68 75
69Unfortunately, this number depends not only on processing speed and load 76Unfortunately, this number depends not only on processing speed and load
70of the machine running Net::SNMP, but also on the network latency and the 77of the machine running Net::SNMP, but also on the network latency and the
71speed of your SNMP agents. 78speed of your SNMP agents.
72 79
73AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 80AnyEvent::SNMP tries to dynamically adjust this number upwards and
74and downwards. 81downwards.
82
83Increasing C<$MAX_OUTSTANDING> will not automatically use the
84extra request slots. To increase C<$MAX_OUTSTANDING> and make
85C<AnyEvent::SNMP> make use of the extra paralellity, call
86C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
87
88 AnyEvent::SNMP::set_max_outstanding 500;
89
90Although due to the dynamic adjustment, this might have little lasting
91effect.
75 92
76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses 93Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
77considerably. 94considerably.
78 95
79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) 96=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
80 97
81=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) 98=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>)
82 99
83These values specify the minimum and maximum receive queue length (in 100These values specify the minimum and maximum receive queue length (in
84units of one response packet). 101units of one response packet).
118 135
119=cut 136=cut
120 137
121package AnyEvent::SNMP; 138package AnyEvent::SNMP;
122 139
123no warnings; 140use common::sense;
124use strict qw(subs vars);
125 141
126# it is possible to do this without loading 142# it is possible to do this without loading
127# Net::SNMP::Dispatcher, but much more awkward. 143# Net::SNMP::Dispatcher, but much more awkward.
128use Net::SNMP::Dispatcher; 144use Net::SNMP::Dispatcher;
129 145
132} 148}
133 149
134use Net::SNMP (); 150use Net::SNMP ();
135use AnyEvent (); 151use AnyEvent ();
136 152
137our $VERSION = '0.2'; 153our $VERSION = '1.0';
138 154
139$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 155$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
140 156
141our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 157our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
142 158
143# avoid the method call
144my $timer = sub { shift->timer (@_) };
145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
146
147our $BUSY; 159our $BUSY;
160our $DONE; # finished all jobs
148our %TRANSPORT; # address => [count, watcher] 161our @TRANSPORT; # fileno => [count, watcher]
149our @QUEUE; 162our @QUEUE;
150our $MAX_OUTSTANDING = 50; 163our $MAX_OUTSTANDING = 50;
151our $MIN_RECVQUEUE = 4; 164our $MIN_RECVQUEUE = 8;
152our $MAX_RECVQUEUE = 64; 165our $MAX_RECVQUEUE = 64;
153 166
154sub kick_job; 167sub kick_job;
155 168
156sub _send_pdu { 169sub _send_pdu {
175 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 188 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
176 if $pdu->expect_response; 189 if $pdu->expect_response;
177 190
178 # A crude attempt to recover from temporary failures. 191 # A crude attempt to recover from temporary failures.
179 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 192 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
180 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 193 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
181 undef $retry_w; 194 undef $retry_w;
182 _send_pdu ($pdu, $retries); 195 _send_pdu ($pdu, $retries);
183 }); 196 };
184 } else { 197 } else {
185 --$BUSY; 198 --$BUSY;
186 kick_job; 199 kick_job;
187 } 200 }
188 201
192 } 205 }
193 206
194 # Schedule the timeout handler if the message expects a response. 207 # Schedule the timeout handler if the message expects a response.
195 if ($pdu->expect_response) { 208 if ($pdu->expect_response) {
196 my $transport = $msg->transport; 209 my $transport = $msg->transport;
210 my $fileno = $transport->fileno;
197 211
198 # register the transport 212 # register the transport
199 unless ($TRANSPORT{$transport+0}[0]++) { 213 unless ($TRANSPORT[$fileno][0]++) {
200 $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 214 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
201 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 215 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
202 # Create a new Message object to receive the response 216 # Create a new Message object to receive the response
203 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 217 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
204 218
205 if (!defined $msg) { 219 if (!defined $msg) {
207 } 221 }
208 222
209 # Read the message from the Transport Layer 223 # Read the message from the Transport Layer
210 if (!defined $msg->recv) { 224 if (!defined $msg->recv) {
211 if ($transport->connectionless) { 225 if ($transport->connectionless) {
226 # if we handled very few replies and we have queued work, try
227 # to increase the parallelity as we probably can handle more.
212 if ($count < $MIN_RECVQUEUE && @QUEUE) { 228 if ($count < $MIN_RECVQUEUE && @QUEUE) {
213 ++$MAX_OUTSTANDING; 229 ++$MAX_OUTSTANDING;
214 kick_job; 230 kick_job;
215 } 231 }
216 } else { 232 } else {
217 # for some reason, connected-oriented transports seem to need this 233 # for some reason, connected-oriented transports seem to need this
218 delete $TRANSPORT{$transport+0} 234 delete $TRANSPORT[$fileno]
219 unless --$TRANSPORT{$transport+0}[0]; 235 unless --$TRANSPORT[$fileno][0];
220 } 236 }
221 237
222 $msg->error; 238 $msg->error;
223 return; 239 return;
224 } 240 }
247 undef $$rtimeout_w; 263 undef $$rtimeout_w;
248 264
249 --$BUSY; 265 --$BUSY;
250 kick_job; 266 kick_job;
251 267
252 unless (--$TRANSPORT{$transport+0}[0]) { 268 unless (--$TRANSPORT[$fileno][0]) {
253 delete $TRANSPORT{$transport+0}; 269 delete $TRANSPORT[$fileno];
254 return; 270 return;
255 } 271 }
256 } 272 }
257 } 273 }
258 274
275 # when we end up here, we successfully handled $MAX_RECVQUEUE
276 # replies in one iteration, so assume we are overloaded
277 # and reduce the amount of parallelity.
259 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; 278 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
260 }); 279 };
261 } 280 }
262 281
263 $msg->timeout_id (\(my $rtimeout_w = 282 $msg->timeout_id (\(my $rtimeout_w =
264 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 283 AE::timer $pdu->timeout, 0, sub {
265 my $rtimeout_w = $msg->timeout_id; 284 my $rtimeout_w = $msg->timeout_id;
266 if ($$rtimeout_w) { 285 if ($$rtimeout_w) {
267 undef $$rtimeout_w; 286 undef $$rtimeout_w;
268 delete $TRANSPORT{$transport+0} 287 delete $TRANSPORT[$fileno]
269 unless --$TRANSPORT{$transport+0}[0]; 288 unless --$TRANSPORT[$fileno][0];
270 } 289 }
271 290
272 if ($retries--) { 291 if ($retries--) {
273 _send_pdu ($pdu, $retries); 292 _send_pdu ($pdu, $retries);
274 } else { 293 } else {
277 296
278 --$BUSY; 297 --$BUSY;
279 kick_job; 298 kick_job;
280 } 299 }
281 }) 300 })
282 )); 301 );
283 } else { 302 } else {
284 --$BUSY; 303 --$BUSY;
285 kick_job; 304 kick_job;
286 } 305 }
287} 306}
290 while ($BUSY < $MAX_OUTSTANDING) { 309 while ($BUSY < $MAX_OUTSTANDING) {
291 my $pdu = shift @QUEUE 310 my $pdu = shift @QUEUE
292 or last; 311 or last;
293 312
294 ++$BUSY; 313 ++$BUSY;
295
296 _send_pdu $pdu, $pdu->retries; 314 _send_pdu $pdu, $pdu->retries;
297 } 315 }
316
317 $DONE and $DONE->() unless $BUSY;
298} 318}
319
299sub send_pdu($$$) { 320sub send_pdu($$$) {
300 my (undef, $pdu, $delay) = @_; 321 my (undef, $pdu, $delay) = @_;
301 322
302 # $delay is not very sensibly implemented by AnyEvent::SNMP, 323 # $delay is not very sensibly implemented by AnyEvent::SNMP,
303 # but apparently it is not a very sensible feature. 324 # but apparently it is not a very sensible feature.
304 if ($delay > 0) { 325 if ($delay > 0) {
305 ++$BUSY; 326 ++$BUSY;
306 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 327 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
307 undef $delay_w; 328 undef $delay_w;
329 push @QUEUE, $pdu;
308 --$BUSY; 330 --$BUSY;
309 push @QUEUE, $pdu;
310 kick_job; 331 kick_job;
311 }); 332 };
312 return 1; 333 return 1;
313 } 334 }
314 335
315 push @QUEUE, $pdu; 336 push @QUEUE, $pdu;
316 kick_job; 337 kick_job;
317 338
318 1 339 1
319} 340}
320 341
321sub activate($) { 342sub activate($) {
322 AnyEvent->one_event while $BUSY; 343 while ($BUSY) {
344 $DONE = AE::cv;
345 $DONE->recv;
346 undef $DONE;
347 }
323} 348}
324 349
325sub one_event($) { 350sub one_event($) {
351 # should not ever be used
326 AnyEvent->one_event; 352 AnyEvent->one_event; #d# todo
353}
354
355sub set_max_outstanding($) {
356 $MAX_OUTSTANDING = $_[0];
357 kick_job;
327} 358}
328 359
329=head1 SEE ALSO 360=head1 SEE ALSO
330 361
331L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. 362L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines