ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.3 by root, Sat Apr 18 10:17:53 2009 UTC vs.
Revision 1.8 by root, Wed Jan 6 10:25:54 2010 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. 3AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::SNMP; 7 use AnyEvent::SNMP;
8 use Net::SNMP; 8 use Net::SNMP;
49 49
50=over 4 50=over 4
51 51
52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) 52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic)
53 53
54=item AnyEvent::SNMP::set_max_outstanding $new_value
55
54Use this package variable to restrict the number of outstanding SNMP 56Use this package variable to restrict the number of outstanding SNMP
55requests at any point in time. 57requests at any point in time.
56 58
57Net::SNMP is very fast at creating and sending SNMP requests, but much 59Net::SNMP is very fast at creating and sending SNMP requests, but much
58slower at parsing (big, bulk) responses. This makes it easy to request a 60slower at parsing (big, bulk) responses. This makes it easy to request a
61In the best case, this can lead to unnecessary delays (and even time-outs, 63In the best case, this can lead to unnecessary delays (and even time-outs,
62as the data has been received but not yet processed) and in the worst 64as the data has been received but not yet processed) and in the worst
63case, this can lead to packet loss, when the receive queue overflows and 65case, this can lead to packet loss, when the receive queue overflows and
64the kernel can no longer accept new packets. 66the kernel can no longer accept new packets.
65 67
66To avoid this, you can (and should) limit the number of outstanding requests 68To avoid this, you can (and should) limit the number of outstanding
67to a number low enough so that parsing time doesn't introduce noticable delays. 69requests to a number low enough so that parsing time doesn't introduce
70noticable delays.
68 71
69Unfortunately, this number depends not only on processing speed and load 72Unfortunately, this number depends not only on processing speed and load
70of the machine running Net::SNMP, but also on the network latency and the 73of the machine running Net::SNMP, but also on the network latency and the
71speed of your SNMP agents. 74speed of your SNMP agents.
72 75
73AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards 76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
74and downwards. 77and downwards.
75 78
79Increasing C<$MAX_OUTSTANDING> will not automatically use the
80extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83
84 AnyEvent::SNMP::set_max_outstanding 500;
85
86Although due to the dynamic adjustment, this might have little lasting
87effect.
88
76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses 89Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
77considerably. 90considerably.
78 91
79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) 92=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
80 93
81=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) 94=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>)
82 95
83These values specify the minimum and maximum receive queue length (in 96These values specify the minimum and maximum receive queue length (in
84units of one response packet). 97units of one response packet).
132} 145}
133 146
134use Net::SNMP (); 147use Net::SNMP ();
135use AnyEvent (); 148use AnyEvent ();
136 149
137our $VERSION = '0.2'; 150our $VERSION = '1.0';
138 151
139$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
140 153
141our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
142 155
143# avoid the method call
144my $timer = sub { shift->timer (@_) };
145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
146
147our $BUSY; 156our $BUSY;
157our $DONE; # finished all jobs
148our %TRANSPORT; # address => [count, watcher] 158our @TRANSPORT; # fileno => [count, watcher]
149our @QUEUE; 159our @QUEUE;
150our $MAX_OUTSTANDING = 50; 160our $MAX_OUTSTANDING = 50;
151our $MIN_RECVQUEUE = 4; 161our $MIN_RECVQUEUE = 8;
152our $MAX_RECVQUEUE = 64; 162our $MAX_RECVQUEUE = 64;
153 163
154sub kick_job; 164sub kick_job; # also --$BUSY
155 165
156sub _send_pdu { 166sub _send_pdu {
157 my ($pdu, $retries) = @_; 167 my ($pdu, $retries) = @_;
158 168
159 # mostly copied from Net::SNMP::Dispatch 169 # mostly copied from Net::SNMP::Dispatch
161 # Pass the PDU to Message Processing so that it can 171 # Pass the PDU to Message Processing so that it can
162 # create the new outgoing message. 172 # create the new outgoing message.
163 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); 173 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
164 174
165 if (!defined $msg) { 175 if (!defined $msg) {
166 --$BUSY;
167 kick_job; 176 kick_job;
168 # Inform the command generator about the Message Processing error. 177 # Inform the command generator about the Message Processing error.
169 $pdu->status_information ($MESSAGE_PROCESSING->error); 178 $pdu->status_information ($MESSAGE_PROCESSING->error);
170 return; 179 return;
171 } 180 }
175 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 184 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
176 if $pdu->expect_response; 185 if $pdu->expect_response;
177 186
178 # A crude attempt to recover from temporary failures. 187 # A crude attempt to recover from temporary failures.
179 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 188 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
180 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 189 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
181 undef $retry_w; 190 undef $retry_w;
182 _send_pdu ($pdu, $retries); 191 _send_pdu ($pdu, $retries);
183 }); 192 };
184 } else { 193 } else {
185 --$BUSY;
186 kick_job; 194 kick_job;
187 } 195 }
188 196
189 # Inform the command generator about the send() error. 197 # Inform the command generator about the send() error.
190 $pdu->status_information ($msg->error); 198 $pdu->status_information ($msg->error);
192 } 200 }
193 201
194 # Schedule the timeout handler if the message expects a response. 202 # Schedule the timeout handler if the message expects a response.
195 if ($pdu->expect_response) { 203 if ($pdu->expect_response) {
196 my $transport = $msg->transport; 204 my $transport = $msg->transport;
205 my $fileno = $transport->fileno;
197 206
198 # register the transport 207 # register the transport
199 unless ($TRANSPORT{$transport+0}[0]++) { 208 unless ($TRANSPORT[$fileno][0]++) {
200 $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 209 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
201 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go 210 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
202 # Create a new Message object to receive the response 211 # Create a new Message object to receive the response
203 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 212 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
204 213
205 if (!defined $msg) { 214 if (!defined $msg) {
207 } 216 }
208 217
209 # Read the message from the Transport Layer 218 # Read the message from the Transport Layer
210 if (!defined $msg->recv) { 219 if (!defined $msg->recv) {
211 if ($transport->connectionless) { 220 if ($transport->connectionless) {
221 # if we handled very few replies and we have queued work, try
222 # to increase the parallelity as we probably can handle more.
212 if ($count < $MIN_RECVQUEUE && @QUEUE) { 223 if ($count < $MIN_RECVQUEUE && @QUEUE) {
213 ++$MAX_OUTSTANDING; 224 ++$MAX_OUTSTANDING;
214 kick_job; 225 kick_job;
215 } 226 }
216 } else { 227 } else {
217 # for some reason, connected-oriented transports seem to need this 228 # for some reason, connected-oriented transports seem to need this
218 delete $TRANSPORT{$transport+0} 229 delete $TRANSPORT[$fileno]
219 unless --$TRANSPORT{$transport+0}[0]; 230 unless --$TRANSPORT[$fileno][0];
220 } 231 }
221 232
222 $msg->error; 233 $msg->error;
223 return; 234 return;
224 } 235 }
244 # Cancel the timeout. 255 # Cancel the timeout.
245 my $rtimeout_w = $msg->timeout_id; 256 my $rtimeout_w = $msg->timeout_id;
246 if ($$rtimeout_w) { 257 if ($$rtimeout_w) {
247 undef $$rtimeout_w; 258 undef $$rtimeout_w;
248 259
249 --$BUSY;
250 kick_job; 260 kick_job;
251 261
252 unless (--$TRANSPORT{$transport+0}[0]) { 262 unless (--$TRANSPORT[$fileno][0]) {
253 delete $TRANSPORT{$transport+0}; 263 delete $TRANSPORT[$fileno];
254 return; 264 return;
255 } 265 }
256 } 266 }
257 } 267 }
258 268
269 # when we end up here, we successfully handled $MAX_RECVQUEUE
270 # replies in one iteration, so assume we are overloaded
271 # and reduce the amount of parallelity.
259 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; 272 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
260 }); 273 };
261 } 274 }
262 275
263 $msg->timeout_id (\(my $rtimeout_w = 276 $msg->timeout_id (\(my $rtimeout_w =
264 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 277 AE::timer $pdu->timeout, 0, sub {
265 my $rtimeout_w = $msg->timeout_id; 278 my $rtimeout_w = $msg->timeout_id;
266 if ($$rtimeout_w) { 279 if ($$rtimeout_w) {
267 undef $$rtimeout_w; 280 undef $$rtimeout_w;
268 delete $TRANSPORT{$transport+0} 281 delete $TRANSPORT[$fileno]
269 unless --$TRANSPORT{$transport+0}[0]; 282 unless --$TRANSPORT[$fileno][0];
270 } 283 }
271 284
272 if ($retries--) { 285 if ($retries--) {
273 _send_pdu ($pdu, $retries); 286 _send_pdu ($pdu, $retries);
274 } else { 287 } else {
275 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); 288 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
276 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); 289 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
277 290
278 --$BUSY;
279 kick_job; 291 kick_job;
280 } 292 }
281 }) 293 })
282 )); 294 );
283 } else { 295 } else {
284 --$BUSY;
285 kick_job; 296 kick_job;
286 } 297 }
287} 298}
288 299
289sub kick_job { 300sub kick_job {
301 --$BUSY;
302
290 while ($BUSY < $MAX_OUTSTANDING) { 303 while ($BUSY < $MAX_OUTSTANDING) {
291 my $pdu = shift @QUEUE 304 my $pdu = shift @QUEUE
292 or last; 305 or last;
293 306
294 ++$BUSY; 307 ++$BUSY;
295 308
296 _send_pdu $pdu, $pdu->retries; 309 _send_pdu $pdu, $pdu->retries;
297 } 310 }
311
312 $DONE and $DONE->() unless $BUSY;
298} 313}
314
299sub send_pdu($$$) { 315sub send_pdu($$$) {
300 my (undef, $pdu, $delay) = @_; 316 my (undef, $pdu, $delay) = @_;
301 317
302 # $delay is not very sensibly implemented by AnyEvent::SNMP, 318 # $delay is not very sensibly implemented by AnyEvent::SNMP,
303 # but apparently it is not a very sensible feature. 319 # but apparently it is not a very sensible feature.
304 if ($delay > 0) { 320 if ($delay > 0) {
305 ++$BUSY; 321 ++$BUSY;
306 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 322 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
307 undef $delay_w; 323 undef $delay_w;
308 --$BUSY;
309 push @QUEUE, $pdu; 324 push @QUEUE, $pdu;
310 kick_job; 325 kick_job;
311 }); 326 };
312 return 1; 327 return 1;
313 } 328 }
314 329
315 push @QUEUE, $pdu; 330 push @QUEUE, $pdu;
316 kick_job; 331 kick_job;
317 332
318 1 333 1
319} 334}
320 335
321sub activate($) { 336sub activate($) {
322 AnyEvent->one_event while $BUSY; 337 while ($BUSY) {
338 $DONE = AE::cv;
339 $DONE->recv;
340 undef $DONE;
341 }
323} 342}
324 343
325sub one_event($) { 344sub one_event($) {
326 AnyEvent->one_event; 345 AnyEvent->one_event; #d# todo
346}
347
348sub set_max_outstanding($) {
349 $MAX_OUTSTANDING = $_[0];
350
351 ++$BUSY; # kick_job decrements $BUSY
352 kick_job;
327} 353}
328 354
329=head1 SEE ALSO 355=head1 SEE ALSO
330 356
331L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. 357L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines