1 | =head1 NAME |
1 | =head1 NAME |
2 | |
2 | |
3 | AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. |
3 | AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent. |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent::SNMP; |
7 | use AnyEvent::SNMP; |
8 | use Net::SNMP; |
8 | use Net::SNMP; |
… | |
… | |
49 | |
49 | |
50 | =over 4 |
50 | =over 4 |
51 | |
51 | |
52 | =item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) |
52 | =item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) |
53 | |
53 | |
|
|
54 | =item AnyEvent::SNMP::set_max_outstanding $new_value |
|
|
55 | |
54 | Use this package variable to restrict the number of outstanding SNMP |
56 | Use this package variable to restrict the number of outstanding SNMP |
55 | requests at any point in time. |
57 | requests at any point in time. |
56 | |
58 | |
57 | Net::SNMP is very fast at creating and sending SNMP requests, but much |
59 | Net::SNMP is very fast at creating and sending SNMP requests, but much |
58 | slower at parsing (big, bulk) responses. This makes it easy to request a |
60 | slower at parsing (big, bulk) responses. This makes it easy to request a |
… | |
… | |
61 | In the best case, this can lead to unnecessary delays (and even time-outs, |
63 | In the best case, this can lead to unnecessary delays (and even time-outs, |
62 | as the data has been received but not yet processed) and in the worst |
64 | as the data has been received but not yet processed) and in the worst |
63 | case, this can lead to packet loss, when the receive queue overflows and |
65 | case, this can lead to packet loss, when the receive queue overflows and |
64 | the kernel can no longer accept new packets. |
66 | the kernel can no longer accept new packets. |
65 | |
67 | |
66 | To avoid this, you can (and should) limit the number of outstanding requests |
68 | To avoid this, you can (and should) limit the number of outstanding |
67 | to a number low enough so that parsing time doesn't introduce noticable delays. |
69 | requests to a number low enough so that parsing time doesn't introduce |
|
|
70 | noticable delays. |
68 | |
71 | |
69 | Unfortunately, this number depends not only on processing speed and load |
72 | Unfortunately, this number depends not only on processing speed and load |
70 | of the machine running Net::SNMP, but also on the network latency and the |
73 | of the machine running Net::SNMP, but also on the network latency and the |
71 | speed of your SNMP agents. |
74 | speed of your SNMP agents. |
72 | |
75 | |
73 | AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards |
76 | AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards |
74 | and downwards. |
77 | and downwards. |
75 | |
78 | |
|
|
79 | Increasing C<$MAX_OUTSTANDING> will not automatically use the |
|
|
80 | C<extra request slots. To increase $MAX_OUTSTANDING> and make |
|
|
81 | C<C<AnyEvent::SNMP> make use of the extra paralellity, call |
|
|
82 | C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: |
|
|
83 | |
|
|
84 | AnyEvent::SNMP::set_max_outstanding 500; |
|
|
85 | |
|
|
86 | Although due to the dynamic adjustment, this might have little lasting |
|
|
87 | effect. |
|
|
88 | |
76 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
89 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
77 | considerably. |
90 | considerably. |
78 | |
91 | |
79 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) |
92 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>) |
80 | |
93 | |
81 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
94 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
82 | |
95 | |
83 | These values specify the minimum and maximum receive queue length (in |
96 | These values specify the minimum and maximum receive queue length (in |
84 | units of one response packet). |
97 | units of one response packet). |
… | |
… | |
143 | # avoid the method call |
156 | # avoid the method call |
144 | my $timer = sub { shift->timer (@_) }; |
157 | my $timer = sub { shift->timer (@_) }; |
145 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
158 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
146 | |
159 | |
147 | our $BUSY; |
160 | our $BUSY; |
148 | our %TRANSPORT; # address => [count, watcher] |
161 | our @TRANSPORT; # fileno => [count, watcher] |
149 | our @QUEUE; |
162 | our @QUEUE; |
150 | our $MAX_OUTSTANDING = 50; |
163 | our $MAX_OUTSTANDING = 50; |
151 | our $MIN_RECVQUEUE = 4; |
164 | our $MIN_RECVQUEUE = 8; |
152 | our $MAX_RECVQUEUE = 64; |
165 | our $MAX_RECVQUEUE = 64; |
153 | |
166 | |
154 | sub kick_job; |
167 | sub kick_job; |
155 | |
168 | |
156 | sub _send_pdu { |
169 | sub _send_pdu { |
… | |
… | |
192 | } |
205 | } |
193 | |
206 | |
194 | # Schedule the timeout handler if the message expects a response. |
207 | # Schedule the timeout handler if the message expects a response. |
195 | if ($pdu->expect_response) { |
208 | if ($pdu->expect_response) { |
196 | my $transport = $msg->transport; |
209 | my $transport = $msg->transport; |
|
|
210 | my $fileno = $transport->fileno; |
197 | |
211 | |
198 | # register the transport |
212 | # register the transport |
199 | unless ($TRANSPORT{$transport+0}[0]++) { |
213 | unless ($TRANSPORT[$fileno][0]++) { |
200 | $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
214 | $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
201 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
215 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
202 | # Create a new Message object to receive the response |
216 | # Create a new Message object to receive the response |
203 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
217 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
204 | |
218 | |
205 | if (!defined $msg) { |
219 | if (!defined $msg) { |
… | |
… | |
207 | } |
221 | } |
208 | |
222 | |
209 | # Read the message from the Transport Layer |
223 | # Read the message from the Transport Layer |
210 | if (!defined $msg->recv) { |
224 | if (!defined $msg->recv) { |
211 | if ($transport->connectionless) { |
225 | if ($transport->connectionless) { |
|
|
226 | # if we handled very few replies and we have queued work, try |
|
|
227 | # to increase the parallelity as we probably can handle more. |
212 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
228 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
213 | ++$MAX_OUTSTANDING; |
229 | ++$MAX_OUTSTANDING; |
214 | kick_job; |
230 | kick_job; |
215 | } |
231 | } |
216 | } else { |
232 | } else { |
217 | # for some reason, connected-oriented transports seem to need this |
233 | # for some reason, connected-oriented transports seem to need this |
218 | delete $TRANSPORT{$transport+0} |
234 | delete $TRANSPORT[$fileno] |
219 | unless --$TRANSPORT{$transport+0}[0]; |
235 | unless --$TRANSPORT[$fileno][0]; |
220 | } |
236 | } |
221 | |
237 | |
222 | $msg->error; |
238 | $msg->error; |
223 | return; |
239 | return; |
224 | } |
240 | } |
… | |
… | |
247 | undef $$rtimeout_w; |
263 | undef $$rtimeout_w; |
248 | |
264 | |
249 | --$BUSY; |
265 | --$BUSY; |
250 | kick_job; |
266 | kick_job; |
251 | |
267 | |
252 | unless (--$TRANSPORT{$transport+0}[0]) { |
268 | unless (--$TRANSPORT[$fileno][0]) { |
253 | delete $TRANSPORT{$transport+0}; |
269 | delete $TRANSPORT[$fileno]; |
254 | return; |
270 | return; |
255 | } |
271 | } |
256 | } |
272 | } |
257 | } |
273 | } |
258 | |
274 | |
|
|
275 | # when we end up here, we successfully handled $MAX_RECVQUEUE |
|
|
276 | # replies in one iteration, so assume we are overloaded |
|
|
277 | # and reduce the amount of parallelity. |
259 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; |
278 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; |
260 | }); |
279 | }); |
261 | } |
280 | } |
262 | |
281 | |
263 | $msg->timeout_id (\(my $rtimeout_w = |
282 | $msg->timeout_id (\(my $rtimeout_w = |
264 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
283 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
265 | my $rtimeout_w = $msg->timeout_id; |
284 | my $rtimeout_w = $msg->timeout_id; |
266 | if ($$rtimeout_w) { |
285 | if ($$rtimeout_w) { |
267 | undef $$rtimeout_w; |
286 | undef $$rtimeout_w; |
268 | delete $TRANSPORT{$transport+0} |
287 | delete $TRANSPORT[$fileno] |
269 | unless --$TRANSPORT{$transport+0}[0]; |
288 | unless --$TRANSPORT[$fileno][0]; |
270 | } |
289 | } |
271 | |
290 | |
272 | if ($retries--) { |
291 | if ($retries--) { |
273 | _send_pdu ($pdu, $retries); |
292 | _send_pdu ($pdu, $retries); |
274 | } else { |
293 | } else { |
… | |
… | |
294 | ++$BUSY; |
313 | ++$BUSY; |
295 | |
314 | |
296 | _send_pdu $pdu, $pdu->retries; |
315 | _send_pdu $pdu, $pdu->retries; |
297 | } |
316 | } |
298 | } |
317 | } |
|
|
318 | |
299 | sub send_pdu($$$) { |
319 | sub send_pdu($$$) { |
300 | my (undef, $pdu, $delay) = @_; |
320 | my (undef, $pdu, $delay) = @_; |
301 | |
321 | |
302 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
322 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
303 | # but apparently it is not a very sensible feature. |
323 | # but apparently it is not a very sensible feature. |
… | |
… | |
324 | |
344 | |
325 | sub one_event($) { |
345 | sub one_event($) { |
326 | AnyEvent->one_event; |
346 | AnyEvent->one_event; |
327 | } |
347 | } |
328 | |
348 | |
|
|
349 | sub set_max_outstanding($) { |
|
|
350 | $MAX_OUTSTANDING = $_[0]; |
|
|
351 | kick_job; |
|
|
352 | } |
|
|
353 | |
329 | =head1 SEE ALSO |
354 | =head1 SEE ALSO |
330 | |
355 | |
331 | L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. |
356 | L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. |
332 | |
357 | |
333 | =head1 AUTHOR |
358 | =head1 AUTHOR |