… | |
… | |
75 | |
75 | |
76 | AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards |
76 | AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards |
77 | and downwards. |
77 | and downwards. |
78 | |
78 | |
79 | Increasing C<$MAX_OUTSTANDING> will not automatically use the |
79 | Increasing C<$MAX_OUTSTANDING> will not automatically use the |
80 | C<extra request slots. To increase $MAX_OUTSTANDING> and make |
80 | extra request slots. To increase C<$MAX_OUTSTANDING> and make |
81 | C<C<AnyEvent::SNMP> make use of the extra paralellity, call |
81 | C<AnyEvent::SNMP> make use of the extra paralellity, call |
82 | C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: |
82 | C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: |
83 | |
83 | |
84 | AnyEvent::SNMP::set_max_outstanding 500; |
84 | AnyEvent::SNMP::set_max_outstanding 500; |
85 | |
85 | |
86 | Although due to the dynamic adjustment, this might have little lasting |
86 | Although due to the dynamic adjustment, this might have little lasting |
… | |
… | |
145 | } |
145 | } |
146 | |
146 | |
147 | use Net::SNMP (); |
147 | use Net::SNMP (); |
148 | use AnyEvent (); |
148 | use AnyEvent (); |
149 | |
149 | |
150 | our $VERSION = '0.2'; |
150 | our $VERSION = '1.0'; |
151 | |
151 | |
152 | $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; |
152 | $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; |
153 | |
153 | |
154 | our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; |
154 | our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; |
155 | |
155 | |
156 | # avoid the method call |
|
|
157 | my $timer = sub { shift->timer (@_) }; |
|
|
158 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
|
|
159 | |
|
|
160 | our $BUSY; |
156 | our $BUSY; |
|
|
157 | our $DONE; # finished all jobs |
161 | our @TRANSPORT; # fileno => [count, watcher] |
158 | our @TRANSPORT; # fileno => [count, watcher] |
162 | our @QUEUE; |
159 | our @QUEUE; |
163 | our $MAX_OUTSTANDING = 50; |
160 | our $MAX_OUTSTANDING = 50; |
164 | our $MIN_RECVQUEUE = 8; |
161 | our $MIN_RECVQUEUE = 8; |
165 | our $MAX_RECVQUEUE = 64; |
162 | our $MAX_RECVQUEUE = 64; |
166 | |
163 | |
167 | sub kick_job; |
164 | sub kick_job; # also --$BUSY |
168 | |
165 | |
169 | sub _send_pdu { |
166 | sub _send_pdu { |
170 | my ($pdu, $retries) = @_; |
167 | my ($pdu, $retries) = @_; |
171 | |
168 | |
172 | # mostly copied from Net::SNMP::Dispatch |
169 | # mostly copied from Net::SNMP::Dispatch |
… | |
… | |
174 | # Pass the PDU to Message Processing so that it can |
171 | # Pass the PDU to Message Processing so that it can |
175 | # create the new outgoing message. |
172 | # create the new outgoing message. |
176 | my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); |
173 | my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); |
177 | |
174 | |
178 | if (!defined $msg) { |
175 | if (!defined $msg) { |
179 | --$BUSY; |
|
|
180 | kick_job; |
176 | kick_job; |
181 | # Inform the command generator about the Message Processing error. |
177 | # Inform the command generator about the Message Processing error. |
182 | $pdu->status_information ($MESSAGE_PROCESSING->error); |
178 | $pdu->status_information ($MESSAGE_PROCESSING->error); |
183 | return; |
179 | return; |
184 | } |
180 | } |
… | |
… | |
188 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) |
184 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) |
189 | if $pdu->expect_response; |
185 | if $pdu->expect_response; |
190 | |
186 | |
191 | # A crude attempt to recover from temporary failures. |
187 | # A crude attempt to recover from temporary failures. |
192 | if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { |
188 | if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { |
193 | my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
189 | my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub { |
194 | undef $retry_w; |
190 | undef $retry_w; |
195 | _send_pdu ($pdu, $retries); |
191 | _send_pdu ($pdu, $retries); |
196 | }); |
192 | }; |
197 | } else { |
193 | } else { |
198 | --$BUSY; |
|
|
199 | kick_job; |
194 | kick_job; |
200 | } |
195 | } |
201 | |
196 | |
202 | # Inform the command generator about the send() error. |
197 | # Inform the command generator about the send() error. |
203 | $pdu->status_information ($msg->error); |
198 | $pdu->status_information ($msg->error); |
… | |
… | |
209 | my $transport = $msg->transport; |
204 | my $transport = $msg->transport; |
210 | my $fileno = $transport->fileno; |
205 | my $fileno = $transport->fileno; |
211 | |
206 | |
212 | # register the transport |
207 | # register the transport |
213 | unless ($TRANSPORT[$fileno][0]++) { |
208 | unless ($TRANSPORT[$fileno][0]++) { |
214 | $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
209 | $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub { |
215 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
210 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
216 | # Create a new Message object to receive the response |
211 | # Create a new Message object to receive the response |
217 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
212 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
218 | |
213 | |
219 | if (!defined $msg) { |
214 | if (!defined $msg) { |
… | |
… | |
260 | # Cancel the timeout. |
255 | # Cancel the timeout. |
261 | my $rtimeout_w = $msg->timeout_id; |
256 | my $rtimeout_w = $msg->timeout_id; |
262 | if ($$rtimeout_w) { |
257 | if ($$rtimeout_w) { |
263 | undef $$rtimeout_w; |
258 | undef $$rtimeout_w; |
264 | |
259 | |
265 | --$BUSY; |
|
|
266 | kick_job; |
260 | kick_job; |
267 | |
261 | |
268 | unless (--$TRANSPORT[$fileno][0]) { |
262 | unless (--$TRANSPORT[$fileno][0]) { |
269 | delete $TRANSPORT[$fileno]; |
263 | delete $TRANSPORT[$fileno]; |
270 | return; |
264 | return; |
… | |
… | |
274 | |
268 | |
275 | # when we end up here, we successfully handled $MAX_RECVQUEUE |
269 | # when we end up here, we successfully handled $MAX_RECVQUEUE |
276 | # replies in one iteration, so assume we are overloaded |
270 | # replies in one iteration, so assume we are overloaded |
277 | # and reduce the amount of parallelity. |
271 | # and reduce the amount of parallelity. |
278 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; |
272 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; |
279 | }); |
273 | }; |
280 | } |
274 | } |
281 | |
275 | |
282 | $msg->timeout_id (\(my $rtimeout_w = |
276 | $msg->timeout_id (\(my $rtimeout_w = |
283 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
277 | AE::timer $pdu->timeout, 0, sub { |
284 | my $rtimeout_w = $msg->timeout_id; |
278 | my $rtimeout_w = $msg->timeout_id; |
285 | if ($$rtimeout_w) { |
279 | if ($$rtimeout_w) { |
286 | undef $$rtimeout_w; |
280 | undef $$rtimeout_w; |
287 | delete $TRANSPORT[$fileno] |
281 | delete $TRANSPORT[$fileno] |
288 | unless --$TRANSPORT[$fileno][0]; |
282 | unless --$TRANSPORT[$fileno][0]; |
… | |
… | |
292 | _send_pdu ($pdu, $retries); |
286 | _send_pdu ($pdu, $retries); |
293 | } else { |
287 | } else { |
294 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); |
288 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); |
295 | $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); |
289 | $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); |
296 | |
290 | |
297 | --$BUSY; |
|
|
298 | kick_job; |
291 | kick_job; |
299 | } |
292 | } |
300 | }) |
293 | }) |
301 | )); |
294 | ); |
302 | } else { |
295 | } else { |
303 | --$BUSY; |
|
|
304 | kick_job; |
296 | kick_job; |
305 | } |
297 | } |
306 | } |
298 | } |
307 | |
299 | |
308 | sub kick_job { |
300 | sub kick_job { |
|
|
301 | --$BUSY; |
|
|
302 | |
309 | while ($BUSY < $MAX_OUTSTANDING) { |
303 | while ($BUSY < $MAX_OUTSTANDING) { |
310 | my $pdu = shift @QUEUE |
304 | my $pdu = shift @QUEUE |
311 | or last; |
305 | or last; |
312 | |
306 | |
313 | ++$BUSY; |
307 | ++$BUSY; |
314 | |
308 | |
315 | _send_pdu $pdu, $pdu->retries; |
309 | _send_pdu $pdu, $pdu->retries; |
316 | } |
310 | } |
|
|
311 | |
|
|
312 | $DONE and $DONE->() unless $BUSY; |
317 | } |
313 | } |
318 | |
314 | |
319 | sub send_pdu($$$) { |
315 | sub send_pdu($$$) { |
320 | my (undef, $pdu, $delay) = @_; |
316 | my (undef, $pdu, $delay) = @_; |
321 | |
317 | |
322 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
318 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
323 | # but apparently it is not a very sensible feature. |
319 | # but apparently it is not a very sensible feature. |
324 | if ($delay > 0) { |
320 | if ($delay > 0) { |
325 | ++$BUSY; |
321 | ++$BUSY; |
326 | my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { |
322 | my $delay_w; $delay_w = AE::timer $delay, 0, sub { |
327 | undef $delay_w; |
323 | undef $delay_w; |
328 | --$BUSY; |
|
|
329 | push @QUEUE, $pdu; |
324 | push @QUEUE, $pdu; |
330 | kick_job; |
325 | kick_job; |
331 | }); |
326 | }; |
332 | return 1; |
327 | return 1; |
333 | } |
328 | } |
334 | |
329 | |
335 | push @QUEUE, $pdu; |
330 | push @QUEUE, $pdu; |
336 | kick_job; |
331 | kick_job; |
337 | |
332 | |
338 | 1 |
333 | 1 |
339 | } |
334 | } |
340 | |
335 | |
341 | sub activate($) { |
336 | sub activate($) { |
342 | AnyEvent->one_event while $BUSY; |
337 | while ($BUSY) { |
|
|
338 | $DONE = AE::cv; |
|
|
339 | $DONE->recv; |
|
|
340 | undef $DONE; |
|
|
341 | } |
343 | } |
342 | } |
344 | |
343 | |
345 | sub one_event($) { |
344 | sub one_event($) { |
346 | AnyEvent->one_event; |
345 | AnyEvent->one_event; #d# todo |
347 | } |
346 | } |
348 | |
347 | |
349 | sub set_max_outstanding($) { |
348 | sub set_max_outstanding($) { |
350 | $MAX_OUTSTANDING = $_[0]; |
349 | $MAX_OUTSTANDING = $_[0]; |
|
|
350 | |
|
|
351 | ++$BUSY; # kick_job decrements $BUSY |
351 | kick_job; |
352 | kick_job; |
352 | } |
353 | } |
353 | |
354 | |
354 | =head1 SEE ALSO |
355 | =head1 SEE ALSO |
355 | |
356 | |