1 | =head1 NAME |
1 | =head1 NAME |
2 | |
2 | |
3 | AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. |
3 | AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent. |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent::SNMP; |
7 | use AnyEvent::SNMP; |
8 | use Net::SNMP; |
8 | use Net::SNMP; |
… | |
… | |
23 | my @result = $cv->wait; |
23 | my @result = $cv->wait; |
24 | |
24 | |
25 | =head1 DESCRIPTION |
25 | =head1 DESCRIPTION |
26 | |
26 | |
27 | This module implements an alternative "event dispatcher" for Net::SNMP, |
27 | This module implements an alternative "event dispatcher" for Net::SNMP, |
28 | using AnyEvent as a backend. |
28 | using AnyEvent as a backend. This integrates Net::SNMP into AnyEvent. That |
29 | |
29 | means you can make non-blocking Net::SNMP calls and as long as other |
30 | This integrates Net::SNMP into AnyEvent: You can make non-blocking |
30 | parts of your program also use AnyEvent (or some event loop supported by |
31 | Net::SNMP calls and as long as other parts of your program also use |
31 | AnyEvent), they will run in parallel. |
32 | AnyEvent (or some event loop supported by AnyEvent), they will run in |
|
|
33 | parallel. |
|
|
34 | |
32 | |
35 | Also, the Net::SNMP scheduler is very inefficient with respect to both CPU |
33 | Also, the Net::SNMP scheduler is very inefficient with respect to both CPU |
36 | and memory usage. Most AnyEvent backends (including the pure-perl backend) |
34 | and memory usage. Most AnyEvent backends (including the pure-perl backend) |
37 | fare much better than the Net::SNMP dispatcher. |
35 | fare much better than the Net::SNMP dispatcher. |
38 | |
36 | |
|
|
37 | Another major added feature of this module over Net::SNMP is automatic |
|
|
38 | rate-adjustments: Net::SNMP is so slow that firing a few thousand |
|
|
39 | requests can cause many timeouts simply because Net::SNMP cannot process |
|
|
40 | the replies in time. This module automatically adapts the send rate to |
|
|
41 | avoid false timeouts caused by slow reply processing. |
|
|
42 | |
39 | A potential disadvantage is that replacing the dispatcher is not at all |
43 | A potential disadvantage of this module is that replacing the dispatcher |
40 | a documented thing to do, so future changes in Net::SNP might break this |
44 | is not at all a documented thing to do, so future changes in Net::SNMP |
41 | module (or the many similar ones). |
45 | might break this module (or the many similar ones). |
42 | |
46 | |
43 | This module does not export anything and does not require you to do |
47 | This module does not export anything and does not require you to do |
44 | anything special apart from loading it I<before doing any non-blocking |
48 | anything special apart from loading it I<before doing any non-blocking |
45 | requests with Net::SNMP>. It is recommended but not required to load this |
49 | requests with Net::SNMP>. It is recommended but not required to load this |
46 | module before C<Net::SNMP>. |
50 | module before C<Net::SNMP>. |
… | |
… | |
49 | |
53 | |
50 | =over 4 |
54 | =over 4 |
51 | |
55 | |
52 | =item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) |
56 | =item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) |
53 | |
57 | |
|
|
58 | =item AnyEvent::SNMP::set_max_outstanding $new_value |
|
|
59 | |
54 | Use this package variable to restrict the number of outstanding SNMP |
60 | Use this package variable to restrict the number of outstanding SNMP |
55 | requests at any point in time. |
61 | requests at any point in time. |
56 | |
62 | |
57 | Net::SNMP is very fast at creating and sending SNMP requests, but much |
63 | Net::SNMP is very fast at creating and sending SNMP requests, but much |
58 | slower at parsing (big, bulk) responses. This makes it easy to request a |
64 | slower at parsing (big, bulk) responses. This makes it easy to request a |
… | |
… | |
61 | In the best case, this can lead to unnecessary delays (and even time-outs, |
67 | In the best case, this can lead to unnecessary delays (and even time-outs, |
62 | as the data has been received but not yet processed) and in the worst |
68 | as the data has been received but not yet processed) and in the worst |
63 | case, this can lead to packet loss, when the receive queue overflows and |
69 | case, this can lead to packet loss, when the receive queue overflows and |
64 | the kernel can no longer accept new packets. |
70 | the kernel can no longer accept new packets. |
65 | |
71 | |
66 | To avoid this, you can (and should) limit the number of outstanding requests |
72 | To avoid this, you can (and should) limit the number of outstanding |
67 | to a number low enough so that parsing time doesn't introduce noticable delays. |
73 | requests to a number low enough so that parsing time doesn't introduce |
|
|
74 | noticeable delays. |
68 | |
75 | |
69 | Unfortunately, this number depends not only on processing speed and load |
76 | Unfortunately, this number depends not only on processing speed and load |
70 | of the machine running Net::SNMP, but also on the network latency and the |
77 | of the machine running Net::SNMP, but also on the network latency and the |
71 | speed of your SNMP agents. |
78 | speed of your SNMP agents. |
72 | |
79 | |
73 | AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards |
80 | AnyEvent::SNMP tries to dynamically adjust this number upwards and |
74 | and downwards. |
81 | downwards. |
|
|
82 | |
|
|
83 | Increasing C<$MAX_OUTSTANDING> will not automatically use the |
|
|
84 | extra request slots. To increase C<$MAX_OUTSTANDING> and make |
|
|
85 | C<AnyEvent::SNMP> make use of the extra parallelity, call |
|
|
86 | C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: |
|
|
87 | |
|
|
88 | AnyEvent::SNMP::set_max_outstanding 500; |
|
|
89 | |
|
|
90 | Although due to the dynamic adjustment, this might have little lasting |
|
|
91 | effect. |
75 | |
92 | |
76 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
93 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
77 | considerably. |
94 | considerably. |
78 | |
95 | |
79 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) |
96 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>) |
80 | |
97 | |
81 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
98 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
82 | |
99 | |
83 | These values specify the minimum and maximum receive queue length (in |
100 | These values specify the minimum and maximum receive queue length (in |
84 | units of one response packet). |
101 | units of one response packet). |
… | |
… | |
86 | When AnyEvent::SNMP handles $MAX_RECVQUEUE or more packets per iteration |
103 | When AnyEvent::SNMP handles $MAX_RECVQUEUE or more packets per iteration |
87 | it will reduce $MAX_OUTSTANDING. If it handles less than $MIN_RECVQUEUE, |
104 | it will reduce $MAX_OUTSTANDING. If it handles less than $MIN_RECVQUEUE, |
88 | it increases $MAX_OUTSTANDING. |
105 | it increases $MAX_OUTSTANDING. |
89 | |
106 | |
90 | This has the result of adjusting the number of outstanding requests so that |
107 | This has the result of adjusting the number of outstanding requests so that |
91 | the recv queue is between the minimum and maximu, usually. |
108 | the recv queue is between the minimum and maximum, usually. |
92 | |
109 | |
93 | This algorithm works reasonably well as long as the responses, response |
110 | This algorithm works reasonably well as long as the responses, response |
94 | latencies and processing times are the same size per packet on average. |
111 | latencies and processing times are the same per packet on average. |
95 | |
112 | |
96 | =back |
113 | =back |
97 | |
114 | |
98 | =head1 COMPATIBILITY |
115 | =head1 COMPATIBILITY |
99 | |
116 | |
… | |
… | |
118 | |
135 | |
119 | =cut |
136 | =cut |
120 | |
137 | |
121 | package AnyEvent::SNMP; |
138 | package AnyEvent::SNMP; |
122 | |
139 | |
123 | no warnings; |
140 | use common::sense; |
124 | use strict qw(subs vars); |
|
|
125 | |
141 | |
126 | # it is possible to do this without loading |
142 | # it is possible to do this without loading |
127 | # Net::SNMP::Dispatcher, but much more awkward. |
143 | # Net::SNMP::Dispatcher, but much more awkward. |
128 | use Net::SNMP::Dispatcher; |
144 | use Net::SNMP::Dispatcher; |
129 | |
145 | |
|
|
146 | # we could inherit fro Net:SNMP::Dispatcher, but since this is undocumented, |
|
|
147 | # I'd rather see it die (and reported) than silenty and subtly fail. |
|
|
148 | *msg_handle_alloc = \&Net::SNMP::Dispatcher::msg_handle_alloc; |
|
|
149 | |
130 | sub Net::SNMP::Dispatcher::instance { |
150 | sub Net::SNMP::Dispatcher::instance { |
131 | AnyEvent::SNMP:: |
151 | AnyEvent::SNMP:: |
132 | } |
152 | } |
133 | |
153 | |
134 | use Net::SNMP (); |
154 | use Net::SNMP (); |
135 | use AnyEvent (); |
155 | use AnyEvent (); |
136 | |
156 | |
137 | our $VERSION = '0.2'; |
157 | our $VERSION = '6.02'; |
138 | |
158 | |
139 | $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; |
159 | $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; |
140 | |
160 | |
141 | our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; |
161 | our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; |
142 | |
162 | |
143 | # avoid the method call |
|
|
144 | my $timer = sub { shift->timer (@_) }; |
|
|
145 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
|
|
146 | |
|
|
147 | our $BUSY; |
163 | our $BUSY; |
|
|
164 | our $DONE; # finished all jobs |
148 | our %TRANSPORT; # address => [count, watcher] |
165 | our @TRANSPORT; # fileno => [count, watcher] |
149 | our @QUEUE; |
166 | our @QUEUE; |
150 | our $MAX_OUTSTANDING = 50; |
167 | our $MAX_OUTSTANDING = 50; |
151 | our $MIN_RECVQUEUE = 4; |
168 | our $MIN_RECVQUEUE = 8; |
152 | our $MAX_RECVQUEUE = 64; |
169 | our $MAX_RECVQUEUE = 64; |
153 | |
170 | |
154 | sub kick_job; |
171 | sub kick_job; |
155 | |
172 | |
156 | sub _send_pdu { |
173 | sub _send_pdu { |
… | |
… | |
175 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) |
192 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) |
176 | if $pdu->expect_response; |
193 | if $pdu->expect_response; |
177 | |
194 | |
178 | # A crude attempt to recover from temporary failures. |
195 | # A crude attempt to recover from temporary failures. |
179 | if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { |
196 | if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { |
180 | my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
197 | my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub { |
181 | undef $retry_w; |
198 | undef $retry_w; |
182 | _send_pdu ($pdu, $retries); |
199 | _send_pdu ($pdu, $retries); |
183 | }); |
200 | }; |
184 | } else { |
201 | } else { |
185 | --$BUSY; |
202 | --$BUSY; |
186 | kick_job; |
203 | kick_job; |
187 | } |
204 | } |
188 | |
205 | |
… | |
… | |
192 | } |
209 | } |
193 | |
210 | |
194 | # Schedule the timeout handler if the message expects a response. |
211 | # Schedule the timeout handler if the message expects a response. |
195 | if ($pdu->expect_response) { |
212 | if ($pdu->expect_response) { |
196 | my $transport = $msg->transport; |
213 | my $transport = $msg->transport; |
|
|
214 | my $fileno = $transport->fileno; |
197 | |
215 | |
198 | # register the transport |
216 | # register the transport |
199 | unless ($TRANSPORT{$transport+0}[0]++) { |
217 | unless ($TRANSPORT[$fileno][0]++) { |
200 | $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
218 | $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub { |
201 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
219 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
202 | # Create a new Message object to receive the response |
220 | # Create a new Message object to receive the response |
203 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
221 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
204 | |
222 | |
205 | if (!defined $msg) { |
223 | if (!defined $msg) { |
… | |
… | |
207 | } |
225 | } |
208 | |
226 | |
209 | # Read the message from the Transport Layer |
227 | # Read the message from the Transport Layer |
210 | if (!defined $msg->recv) { |
228 | if (!defined $msg->recv) { |
211 | if ($transport->connectionless) { |
229 | if ($transport->connectionless) { |
|
|
230 | # if we handled very few replies and we have queued work, try |
|
|
231 | # to increase the parallelity as we probably can handle more. |
212 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
232 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
213 | ++$MAX_OUTSTANDING; |
233 | ++$MAX_OUTSTANDING; |
214 | kick_job; |
234 | kick_job; |
215 | } |
235 | } |
216 | } else { |
236 | } else { |
217 | # for some reason, connected-oriented transports seem to need this |
237 | # for some reason, connected-oriented transports seem to need this |
218 | delete $TRANSPORT{$transport+0} |
238 | delete $TRANSPORT[$fileno] |
219 | unless --$TRANSPORT{$transport+0}[0]; |
239 | unless --$TRANSPORT[$fileno][0]; |
220 | } |
240 | } |
221 | |
241 | |
222 | $msg->error; |
242 | $msg->error; |
223 | return; |
243 | return; |
224 | } |
244 | } |
… | |
… | |
237 | |
257 | |
238 | # Set the error if applicable. |
258 | # Set the error if applicable. |
239 | $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error; |
259 | $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error; |
240 | |
260 | |
241 | # Notify the command generator to process the response. |
261 | # Notify the command generator to process the response. |
242 | $msg->process_response_pdu; |
262 | # Net::SNMP calls process_response_pdu, which simply calls callback_execute, |
|
|
263 | # but some errors cause $msg to be of type Net::SNMP::Message, not Net::SMMP::PDU, |
|
|
264 | # so we call the underlying callback_execute method which exists on both and |
|
|
265 | # seems to do the right thing. |
|
|
266 | $msg->callback_execute; |
243 | |
267 | |
244 | # Cancel the timeout. |
268 | # Cancel the timeout. |
245 | my $rtimeout_w = $msg->timeout_id; |
269 | my $rtimeout_w = $msg->timeout_id; |
246 | if ($$rtimeout_w) { |
270 | if ($$rtimeout_w) { |
247 | undef $$rtimeout_w; |
271 | undef $$rtimeout_w; |
248 | |
272 | |
249 | --$BUSY; |
273 | --$BUSY; |
250 | kick_job; |
274 | kick_job; |
251 | |
275 | |
252 | unless (--$TRANSPORT{$transport+0}[0]) { |
276 | unless (--$TRANSPORT[$fileno][0]) { |
253 | delete $TRANSPORT{$transport+0}; |
277 | delete $TRANSPORT[$fileno]; |
254 | return; |
278 | return; |
255 | } |
279 | } |
256 | } |
280 | } |
257 | } |
281 | } |
258 | |
282 | |
|
|
283 | # when we end up here, we successfully handled $MAX_RECVQUEUE |
|
|
284 | # replies in one iteration, so assume we are overloaded |
|
|
285 | # and reduce the amount of parallelity. |
259 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; |
286 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; |
260 | }); |
287 | }; |
261 | } |
288 | } |
262 | |
289 | |
263 | $msg->timeout_id (\(my $rtimeout_w = |
290 | $msg->timeout_id (\(my $rtimeout_w = |
264 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
291 | AE::timer $pdu->timeout, 0, sub { |
265 | my $rtimeout_w = $msg->timeout_id; |
292 | my $rtimeout_w = $msg->timeout_id; |
266 | if ($$rtimeout_w) { |
293 | if ($$rtimeout_w) { |
267 | undef $$rtimeout_w; |
294 | undef $$rtimeout_w; |
268 | delete $TRANSPORT{$transport+0} |
295 | delete $TRANSPORT[$fileno] |
269 | unless --$TRANSPORT{$transport+0}[0]; |
296 | unless --$TRANSPORT[$fileno][0]; |
270 | } |
297 | } |
271 | |
298 | |
272 | if ($retries--) { |
299 | if ($retries--) { |
273 | _send_pdu ($pdu, $retries); |
300 | _send_pdu ($pdu, $retries); |
274 | } else { |
301 | } else { |
… | |
… | |
277 | |
304 | |
278 | --$BUSY; |
305 | --$BUSY; |
279 | kick_job; |
306 | kick_job; |
280 | } |
307 | } |
281 | }) |
308 | }) |
282 | )); |
309 | ); |
283 | } else { |
310 | } else { |
284 | --$BUSY; |
311 | --$BUSY; |
285 | kick_job; |
312 | kick_job; |
286 | } |
313 | } |
287 | } |
314 | } |
… | |
… | |
290 | while ($BUSY < $MAX_OUTSTANDING) { |
317 | while ($BUSY < $MAX_OUTSTANDING) { |
291 | my $pdu = shift @QUEUE |
318 | my $pdu = shift @QUEUE |
292 | or last; |
319 | or last; |
293 | |
320 | |
294 | ++$BUSY; |
321 | ++$BUSY; |
295 | |
|
|
296 | _send_pdu $pdu, $pdu->retries; |
322 | _send_pdu $pdu, $pdu->retries; |
297 | } |
323 | } |
|
|
324 | |
|
|
325 | $DONE and $DONE->() unless $BUSY; |
298 | } |
326 | } |
|
|
327 | |
299 | sub send_pdu($$$) { |
328 | sub send_pdu($$$) { |
300 | my (undef, $pdu, $delay) = @_; |
329 | my (undef, $pdu, $delay) = @_; |
301 | |
330 | |
302 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
331 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
303 | # but apparently it is not a very sensible feature. |
332 | # but apparently it is not a very sensible feature. |
304 | if ($delay > 0) { |
333 | if ($delay > 0) { |
305 | ++$BUSY; |
334 | ++$BUSY; |
306 | my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { |
335 | my $delay_w; $delay_w = AE::timer $delay, 0, sub { |
307 | undef $delay_w; |
336 | undef $delay_w; |
|
|
337 | push @QUEUE, $pdu; |
308 | --$BUSY; |
338 | --$BUSY; |
309 | push @QUEUE, $pdu; |
|
|
310 | kick_job; |
339 | kick_job; |
311 | }); |
340 | }; |
312 | return 1; |
341 | return 1; |
313 | } |
342 | } |
314 | |
343 | |
315 | push @QUEUE, $pdu; |
344 | push @QUEUE, $pdu; |
316 | kick_job; |
345 | kick_job; |
317 | |
346 | |
318 | 1 |
347 | 1 |
319 | } |
348 | } |
320 | |
349 | |
321 | sub activate($) { |
350 | sub loop($) { |
322 | AnyEvent->one_event while $BUSY; |
351 | while ($BUSY) { |
|
|
352 | $DONE = AE::cv; |
|
|
353 | $DONE->recv; |
|
|
354 | undef $DONE; |
|
|
355 | } |
323 | } |
356 | } |
|
|
357 | |
|
|
358 | *activate = \&loop; # 5.x compatibility? |
|
|
359 | *listen = \&loop; # 5.x compatibility? |
324 | |
360 | |
325 | sub one_event($) { |
361 | sub one_event($) { |
|
|
362 | # should not ever be used |
326 | AnyEvent->one_event; |
363 | AnyEvent->one_event; #d# todo |
327 | } |
364 | } |
|
|
365 | |
|
|
366 | sub set_max_outstanding($) { |
|
|
367 | $MAX_OUTSTANDING = $_[0]; |
|
|
368 | kick_job; |
|
|
369 | } |
|
|
370 | |
|
|
371 | # not provided yet: |
|
|
372 | # schedule # apparently only used by Net::SNMP::Dispatcher itself |
|
|
373 | # register # apparently only used by Net::SNMP::Dispatcher itself |
|
|
374 | # deregister # apparently only used by Net::SNMP::Dispatcher itself |
|
|
375 | # cancel # apparently only used by Net::SNMP::Dispatcher itself |
|
|
376 | # return_response_pdu # apparently not used at all? |
|
|
377 | # error # only used by Net::SNMP::Dispatcher itself? |
|
|
378 | # debug # only used by Net::SNMP::Dispatcher itself? |
328 | |
379 | |
329 | =head1 SEE ALSO |
380 | =head1 SEE ALSO |
330 | |
381 | |
331 | L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. |
382 | L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. |
332 | |
383 | |