ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.4 by root, Sun Apr 19 11:06:21 2009 UTC vs.
Revision 1.8 by root, Wed Jan 6 10:25:54 2010 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. 3AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::SNMP; 7 use AnyEvent::SNMP;
8 use Net::SNMP; 8 use Net::SNMP;
49 49
50=over 4 50=over 4
51 51
52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) 52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic)
53 53
54=item AnyEvent::SNMP::set_max_outstanding $new_value
55
54Use this package variable to restrict the number of outstanding SNMP 56Use this package variable to restrict the number of outstanding SNMP
55requests at any point in time. 57requests at any point in time.
56 58
57Net::SNMP is very fast at creating and sending SNMP requests, but much 59Net::SNMP is very fast at creating and sending SNMP requests, but much
58slower at parsing (big, bulk) responses. This makes it easy to request a 60slower at parsing (big, bulk) responses. This makes it easy to request a
61In the best case, this can lead to unnecessary delays (and even time-outs, 63In the best case, this can lead to unnecessary delays (and even time-outs,
62as the data has been received but not yet processed) and in the worst 64as the data has been received but not yet processed) and in the worst
63case, this can lead to packet loss, when the receive queue overflows and 65case, this can lead to packet loss, when the receive queue overflows and
64the kernel can no longer accept new packets. 66the kernel can no longer accept new packets.
65 67
66To avoid this, you can (and should) limit the number of outstanding requests 68To avoid this, you can (and should) limit the number of outstanding
67to a number low enough so that parsing time doesn't introduce noticable delays. 69requests to a number low enough so that parsing time doesn't introduce
70noticable delays.
68 71
69Unfortunately, this number depends not only on processing speed and load 72Unfortunately, this number depends not only on processing speed and load
70of the machine running Net::SNMP, but also on the network latency and the 73of the machine running Net::SNMP, but also on the network latency and the
71speed of your SNMP agents. 74speed of your SNMP agents.
72 75
73AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
74and downwards. 77and downwards.
75 78
79Increasing C<$MAX_OUTSTANDING> will not automatically use the
80extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83
84 AnyEvent::SNMP::set_max_outstanding 500;
85
86Although due to the dynamic adjustment, this might have little lasting
87effect.
88
76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses 89Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
77considerably. 90considerably.
78 91
79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) 92=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
80 93
81=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) 94=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>)
82 95
83These values specify the minimum and maximum receive queue length (in 96These values specify the minimum and maximum receive queue length (in
84units of one response packet). 97units of one response packet).
132} 145}
133 146
134use Net::SNMP (); 147use Net::SNMP ();
135use AnyEvent (); 148use AnyEvent ();
136 149
137our $VERSION = '0.2'; 150our $VERSION = '1.0';
138 151
139$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
140 153
141our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
142 155
143# avoid the method call
144my $timer = sub { shift->timer (@_) };
145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
146
147our $BUSY; 156our $BUSY;
157our $DONE; # finished all jobs
148our %TRANSPORT; # address => [count, watcher] 158our @TRANSPORT; # fileno => [count, watcher]
149our @QUEUE; 159our @QUEUE;
150our $MAX_OUTSTANDING = 50; 160our $MAX_OUTSTANDING = 50;
151our $MIN_RECVQUEUE = 4; 161our $MIN_RECVQUEUE = 8;
152our $MAX_RECVQUEUE = 64; 162our $MAX_RECVQUEUE = 64;
153 163
154sub kick_job; 164sub kick_job; # also --$BUSY
155 165
156sub _send_pdu { 166sub _send_pdu {
157 my ($pdu, $retries) = @_; 167 my ($pdu, $retries) = @_;
158 168
159 # mostly copied from Net::SNMP::Dispatch 169 # mostly copied from Net::SNMP::Dispatch
161 # Pass the PDU to Message Processing so that it can 171 # Pass the PDU to Message Processing so that it can
162 # create the new outgoing message. 172 # create the new outgoing message.
163 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); 173 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
164 174
165 if (!defined $msg) { 175 if (!defined $msg) {
166 --$BUSY;
167 kick_job; 176 kick_job;
168 # Inform the command generator about the Message Processing error. 177 # Inform the command generator about the Message Processing error.
169 $pdu->status_information ($MESSAGE_PROCESSING->error); 178 $pdu->status_information ($MESSAGE_PROCESSING->error);
170 return; 179 return;
171 } 180 }
175 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 184 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
176 if $pdu->expect_response; 185 if $pdu->expect_response;
177 186
178 # A crude attempt to recover from temporary failures. 187 # A crude attempt to recover from temporary failures.
179 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 188 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
180 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 189 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
181 undef $retry_w; 190 undef $retry_w;
182 _send_pdu ($pdu, $retries); 191 _send_pdu ($pdu, $retries);
183 }); 192 };
184 } else { 193 } else {
185 --$BUSY;
186 kick_job; 194 kick_job;
187 } 195 }
188 196
189 # Inform the command generator about the send() error. 197 # Inform the command generator about the send() error.
190 $pdu->status_information ($msg->error); 198 $pdu->status_information ($msg->error);
192 } 200 }
193 201
194 # Schedule the timeout handler if the message expects a response. 202 # Schedule the timeout handler if the message expects a response.
195 if ($pdu->expect_response) { 203 if ($pdu->expect_response) {
196 my $transport = $msg->transport; 204 my $transport = $msg->transport;
205 my $fileno = $transport->fileno;
197 206
198 # register the transport 207 # register the transport
199 unless ($TRANSPORT{$transport+0}[0]++) { 208 unless ($TRANSPORT[$fileno][0]++) {
200 $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 209 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
201 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 210 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
202 # Create a new Message object to receive the response 211 # Create a new Message object to receive the response
203 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 212 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
204 213
205 if (!defined $msg) { 214 if (!defined $msg) {
215 ++$MAX_OUTSTANDING; 224 ++$MAX_OUTSTANDING;
216 kick_job; 225 kick_job;
217 } 226 }
218 } else { 227 } else {
219 # for some reason, connected-oriented transports seem to need this 228 # for some reason, connected-oriented transports seem to need this
220 delete $TRANSPORT{$transport+0} 229 delete $TRANSPORT[$fileno]
221 unless --$TRANSPORT{$transport+0}[0]; 230 unless --$TRANSPORT[$fileno][0];
222 } 231 }
223 232
224 $msg->error; 233 $msg->error;
225 return; 234 return;
226 } 235 }
246 # Cancel the timeout. 255 # Cancel the timeout.
247 my $rtimeout_w = $msg->timeout_id; 256 my $rtimeout_w = $msg->timeout_id;
248 if ($$rtimeout_w) { 257 if ($$rtimeout_w) {
249 undef $$rtimeout_w; 258 undef $$rtimeout_w;
250 259
251 --$BUSY;
252 kick_job; 260 kick_job;
253 261
254 unless (--$TRANSPORT{$transport+0}[0]) { 262 unless (--$TRANSPORT[$fileno][0]) {
255 delete $TRANSPORT{$transport+0}; 263 delete $TRANSPORT[$fileno];
256 return; 264 return;
257 } 265 }
258 } 266 }
259 } 267 }
260 268
261 # when we end up here, we successfully handled $MAX_RECVQUEUE 269 # when we end up here, we successfully handled $MAX_RECVQUEUE
262 # replies in one iteration, so assume we are overloaded 270 # replies in one iteration, so assume we are overloaded
263 # and reduce the amount of parallelity. 271 # and reduce the amount of parallelity.
264 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; 272 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
265 }); 273 };
266 } 274 }
267 275
268 $msg->timeout_id (\(my $rtimeout_w = 276 $msg->timeout_id (\(my $rtimeout_w =
269 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 277 AE::timer $pdu->timeout, 0, sub {
270 my $rtimeout_w = $msg->timeout_id; 278 my $rtimeout_w = $msg->timeout_id;
271 if ($$rtimeout_w) { 279 if ($$rtimeout_w) {
272 undef $$rtimeout_w; 280 undef $$rtimeout_w;
273 delete $TRANSPORT{$transport+0} 281 delete $TRANSPORT[$fileno]
274 unless --$TRANSPORT{$transport+0}[0]; 282 unless --$TRANSPORT[$fileno][0];
275 } 283 }
276 284
277 if ($retries--) { 285 if ($retries--) {
278 _send_pdu ($pdu, $retries); 286 _send_pdu ($pdu, $retries);
279 } else { 287 } else {
280 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); 288 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
281 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); 289 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
282 290
283 --$BUSY;
284 kick_job; 291 kick_job;
285 } 292 }
286 }) 293 })
287 )); 294 );
288 } else { 295 } else {
289 --$BUSY;
290 kick_job; 296 kick_job;
291 } 297 }
292} 298}
293 299
294sub kick_job { 300sub kick_job {
301 --$BUSY;
302
295 while ($BUSY < $MAX_OUTSTANDING) { 303 while ($BUSY < $MAX_OUTSTANDING) {
296 my $pdu = shift @QUEUE 304 my $pdu = shift @QUEUE
297 or last; 305 or last;
298 306
299 ++$BUSY; 307 ++$BUSY;
300 308
301 _send_pdu $pdu, $pdu->retries; 309 _send_pdu $pdu, $pdu->retries;
302 } 310 }
311
312 $DONE and $DONE->() unless $BUSY;
303} 313}
314
304sub send_pdu($$$) { 315sub send_pdu($$$) {
305 my (undef, $pdu, $delay) = @_; 316 my (undef, $pdu, $delay) = @_;
306 317
307 # $delay is not very sensibly implemented by AnyEvent::SNMP, 318 # $delay is not very sensibly implemented by AnyEvent::SNMP,
308 # but apparently it is not a very sensible feature. 319 # but apparently it is not a very sensible feature.
309 if ($delay > 0) { 320 if ($delay > 0) {
310 ++$BUSY; 321 ++$BUSY;
311 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 322 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
312 undef $delay_w; 323 undef $delay_w;
313 --$BUSY;
314 push @QUEUE, $pdu; 324 push @QUEUE, $pdu;
315 kick_job; 325 kick_job;
316 }); 326 };
317 return 1; 327 return 1;
318 } 328 }
319 329
320 push @QUEUE, $pdu; 330 push @QUEUE, $pdu;
321 kick_job; 331 kick_job;
322 332
323 1 333 1
324} 334}
325 335
326sub activate($) { 336sub activate($) {
327 AnyEvent->one_event while $BUSY; 337 while ($BUSY) {
338 $DONE = AE::cv;
339 $DONE->recv;
340 undef $DONE;
341 }
328} 342}
329 343
330sub one_event($) { 344sub one_event($) {
331 AnyEvent->one_event; 345 AnyEvent->one_event; #d# todo
346}
347
348sub set_max_outstanding($) {
349 $MAX_OUTSTANDING = $_[0];
350
351 ++$BUSY; # kick_job decrements $BUSY
352 kick_job;
332} 353}
333 354
334=head1 SEE ALSO 355=head1 SEE ALSO
335 356
336L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. 357L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines