1 | =head1 NAME |
1 | =head1 NAME |
2 | |
2 | |
3 | AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. |
3 | AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent. |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent::SNMP; |
7 | use AnyEvent::SNMP; |
8 | use Net::SNMP; |
8 | use Net::SNMP; |
… | |
… | |
43 | This module does not export anything and does not require you to do |
43 | This module does not export anything and does not require you to do |
44 | anything special apart from loading it I<before doing any non-blocking |
44 | anything special apart from loading it I<before doing any non-blocking |
45 | requests with Net::SNMP>. It is recommended but not required to load this |
45 | requests with Net::SNMP>. It is recommended but not required to load this |
46 | module before C<Net::SNMP>. |
46 | module before C<Net::SNMP>. |
47 | |
47 | |
|
|
48 | =head1 GLOBAL VARIABLES |
|
|
49 | |
|
|
50 | =over 4 |
|
|
51 | |
|
|
52 | =item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic) |
|
|
53 | |
|
|
54 | =item AnyEvent::SNMP::set_max_outstanding $new_value |
|
|
55 | |
|
|
56 | Use this package variable to restrict the number of outstanding SNMP |
|
|
57 | requests at any point in time. |
|
|
58 | |
|
|
59 | Net::SNMP is very fast at creating and sending SNMP requests, but much |
|
|
60 | slower at parsing (big, bulk) responses. This makes it easy to request a |
|
|
61 | lot of data that can take many seconds to parse. |
|
|
62 | |
|
|
63 | In the best case, this can lead to unnecessary delays (and even time-outs, |
|
|
64 | as the data has been received but not yet processed) and in the worst |
|
|
65 | case, this can lead to packet loss, when the receive queue overflows and |
|
|
66 | the kernel can no longer accept new packets. |
|
|
67 | |
|
|
68 | To avoid this, you can (and should) limit the number of outstanding |
|
|
69 | requests to a number low enough so that parsing time doesn't introduce |
|
|
70 | noticable delays. |
|
|
71 | |
|
|
72 | Unfortunately, this number depends not only on processing speed and load |
|
|
73 | of the machine running Net::SNMP, but also on the network latency and the |
|
|
74 | speed of your SNMP agents. |
|
|
75 | |
|
|
76 | AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards |
|
|
77 | and downwards. |
|
|
78 | |
|
|
79 | Increasing C<$MAX_OUTSTANDING> will not automatically use the |
|
|
80 | C<extra request slots. To increase $MAX_OUTSTANDING> and make |
|
|
81 | C<C<AnyEvent::SNMP> make use of the extra paralellity, call |
|
|
82 | C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.: |
|
|
83 | |
|
|
84 | AnyEvent::SNMP::set_max_outstanding 500; |
|
|
85 | |
|
|
86 | Although due to the dynamic adjustment, this might have little lasting |
|
|
87 | effect. |
|
|
88 | |
|
|
89 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
|
|
90 | considerably. |
|
|
91 | |
|
|
92 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>) |
|
|
93 | |
|
|
94 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
|
|
95 | |
|
|
96 | These values specify the minimum and maximum receive queue length (in |
|
|
97 | units of one response packet). |
|
|
98 | |
|
|
99 | When AnyEvent::SNMP handles $MAX_RECVQUEUE or more packets per iteration |
|
|
100 | it will reduce $MAX_OUTSTANDING. If it handles less than $MIN_RECVQUEUE, |
|
|
101 | it increases $MAX_OUTSTANDING. |
|
|
102 | |
|
|
103 | This has the result of adjusting the number of outstanding requests so that |
|
|
104 | the recv queue is between the minimum and maximu, usually. |
|
|
105 | |
|
|
106 | This algorithm works reasonably well as long as the responses, response |
|
|
107 | latencies and processing times are the same size per packet on average. |
|
|
108 | |
|
|
109 | =back |
|
|
110 | |
|
|
111 | =head1 COMPATIBILITY |
|
|
112 | |
|
|
113 | This module may be used as a drop in replacement for the |
|
|
114 | Net::SNMP::Dispatcher in existing programs. You can still call |
|
|
115 | C<snmp_dispatcher> to start the event-loop, but then you loose the benefit |
|
|
116 | of mixing Net::SNMP events with other events. |
|
|
117 | |
|
|
118 | use AnyEvent::SNMP; |
|
|
119 | use Net::SNMP; |
|
|
120 | |
|
|
121 | # just use Net::SNMP as before |
|
|
122 | |
|
|
123 | # ... start non-blocking snmp request(s)... |
|
|
124 | Net::SNMP->session ( |
|
|
125 | -hostname => "127.0.0.1", |
|
|
126 | -community => "public", |
|
|
127 | -nonblocking => 1, |
|
|
128 | )->get_request (-callback => sub { ... }); |
|
|
129 | |
|
|
130 | snmp_dispatcher; |
|
|
131 | |
48 | =cut |
132 | =cut |
49 | |
133 | |
50 | package AnyEvent::SNMP; |
134 | package AnyEvent::SNMP; |
51 | |
135 | |
52 | no warnings; |
136 | no warnings; |
… | |
… | |
61 | } |
145 | } |
62 | |
146 | |
63 | use Net::SNMP (); |
147 | use Net::SNMP (); |
64 | use AnyEvent (); |
148 | use AnyEvent (); |
65 | |
149 | |
66 | our $VERSION = '0.1'; |
150 | our $VERSION = '0.2'; |
67 | |
151 | |
68 | $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; |
152 | $Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; |
69 | |
153 | |
70 | our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; |
154 | our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; |
71 | |
155 | |
72 | # avoid the method call |
156 | # avoid the method call |
73 | my $timer = sub { shift->timer (@_) }; |
157 | my $timer = sub { shift->timer (@_) }; |
74 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
158 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
75 | |
159 | |
76 | our $BUSY; |
160 | our $BUSY; |
77 | our %TRANSPORT; # address => [count, watcher] |
161 | our @TRANSPORT; # fileno => [count, watcher] |
|
|
162 | our @QUEUE; |
|
|
163 | our $MAX_OUTSTANDING = 50; |
|
|
164 | our $MIN_RECVQUEUE = 8; |
|
|
165 | our $MAX_RECVQUEUE = 64; |
|
|
166 | |
|
|
167 | sub kick_job; |
78 | |
168 | |
79 | sub _send_pdu { |
169 | sub _send_pdu { |
80 | my ($pdu, $retries) = @_; |
170 | my ($pdu, $retries) = @_; |
81 | |
171 | |
82 | # mostly copied from Net::SNMP::Dispatch |
172 | # mostly copied from Net::SNMP::Dispatch |
… | |
… | |
85 | # create the new outgoing message. |
175 | # create the new outgoing message. |
86 | my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); |
176 | my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); |
87 | |
177 | |
88 | if (!defined $msg) { |
178 | if (!defined $msg) { |
89 | --$BUSY; |
179 | --$BUSY; |
|
|
180 | kick_job; |
90 | # Inform the command generator about the Message Processing error. |
181 | # Inform the command generator about the Message Processing error. |
91 | $pdu->status_information ($MESSAGE_PROCESSING->error); |
182 | $pdu->status_information ($MESSAGE_PROCESSING->error); |
92 | return; |
183 | return; |
93 | } |
184 | } |
94 | |
185 | |
… | |
… | |
103 | undef $retry_w; |
194 | undef $retry_w; |
104 | _send_pdu ($pdu, $retries); |
195 | _send_pdu ($pdu, $retries); |
105 | }); |
196 | }); |
106 | } else { |
197 | } else { |
107 | --$BUSY; |
198 | --$BUSY; |
|
|
199 | kick_job; |
108 | } |
200 | } |
109 | |
201 | |
110 | # Inform the command generator about the send() error. |
202 | # Inform the command generator about the send() error. |
111 | $pdu->status_information ($msg->error); |
203 | $pdu->status_information ($msg->error); |
112 | return; |
204 | return; |
113 | } |
205 | } |
114 | |
206 | |
115 | # Schedule the timeout handler if the message expects a response. |
207 | # Schedule the timeout handler if the message expects a response. |
116 | if ($pdu->expect_response) { |
208 | if ($pdu->expect_response) { |
117 | my $transport = $msg->transport; |
209 | my $transport = $msg->transport; |
|
|
210 | my $fileno = $transport->fileno; |
118 | |
211 | |
119 | # register the transport |
212 | # register the transport |
120 | unless ($TRANSPORT{$transport+0}[0]++) { |
213 | unless ($TRANSPORT[$fileno][0]++) { |
121 | $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
214 | $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
|
|
215 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
122 | # Create a new Message object to receive the response |
216 | # Create a new Message object to receive the response |
123 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
217 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
124 | |
218 | |
125 | if (!defined $msg) { |
219 | if (!defined $msg) { |
126 | die sprintf 'Failed to create Message object [%s]', $error; |
220 | die sprintf 'Failed to create Message object [%s]', $error; |
|
|
221 | } |
|
|
222 | |
|
|
223 | # Read the message from the Transport Layer |
|
|
224 | if (!defined $msg->recv) { |
|
|
225 | if ($transport->connectionless) { |
|
|
226 | # if we handled very few replies and we have queued work, try |
|
|
227 | # to increase the parallelity as we probably can handle more. |
|
|
228 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
|
|
229 | ++$MAX_OUTSTANDING; |
|
|
230 | kick_job; |
|
|
231 | } |
|
|
232 | } else { |
|
|
233 | # for some reason, connected-oriented transports seem to need this |
|
|
234 | delete $TRANSPORT[$fileno] |
|
|
235 | unless --$TRANSPORT[$fileno][0]; |
|
|
236 | } |
|
|
237 | |
|
|
238 | $msg->error; |
|
|
239 | return; |
|
|
240 | } |
|
|
241 | |
|
|
242 | # For connection-oriented Transport Domains, it is possible to |
|
|
243 | # "recv" an empty buffer if reassembly is required. |
|
|
244 | if (!$msg->length) { |
|
|
245 | return; |
|
|
246 | } |
|
|
247 | |
|
|
248 | # Hand the message over to Message Processing. |
|
|
249 | if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) { |
|
|
250 | $MESSAGE_PROCESSING->error; |
|
|
251 | return; |
|
|
252 | } |
|
|
253 | |
|
|
254 | # Set the error if applicable. |
|
|
255 | $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error; |
|
|
256 | |
|
|
257 | # Notify the command generator to process the response. |
|
|
258 | $msg->process_response_pdu; |
|
|
259 | |
|
|
260 | # Cancel the timeout. |
|
|
261 | my $rtimeout_w = $msg->timeout_id; |
|
|
262 | if ($$rtimeout_w) { |
|
|
263 | undef $$rtimeout_w; |
|
|
264 | |
|
|
265 | --$BUSY; |
|
|
266 | kick_job; |
|
|
267 | |
|
|
268 | unless (--$TRANSPORT[$fileno][0]) { |
|
|
269 | delete $TRANSPORT[$fileno]; |
|
|
270 | return; |
|
|
271 | } |
|
|
272 | } |
127 | } |
273 | } |
128 | |
274 | |
129 | # Read the message from the Transport Layer |
275 | # when we end up here, we successfully handled $MAX_RECVQUEUE |
130 | if (!defined $msg->recv) { |
276 | # replies in one iteration, so assume we are overloaded |
131 | # for some reason, connected-oriented transports seem to need this |
277 | # and reduce the amount of parallelity. |
132 | unless ($transport->connectionless) { |
278 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; |
133 | delete $TRANSPORT{$transport+0} |
|
|
134 | unless --$TRANSPORT{$transport+0}[0]; |
|
|
135 | } |
|
|
136 | |
|
|
137 | $msg->error; |
|
|
138 | return; |
|
|
139 | } |
|
|
140 | |
|
|
141 | # For connection-oriented Transport Domains, it is possible to |
|
|
142 | # "recv" an empty buffer if reassembly is required. |
|
|
143 | if (!$msg->length) { |
|
|
144 | return; |
|
|
145 | } |
|
|
146 | |
|
|
147 | # Hand the message over to Message Processing. |
|
|
148 | if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) { |
|
|
149 | $MESSAGE_PROCESSING->error; |
|
|
150 | return; |
|
|
151 | } |
|
|
152 | |
|
|
153 | # Set the error if applicable. |
|
|
154 | $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error; |
|
|
155 | |
|
|
156 | # Cancel the timeout. |
|
|
157 | my $rtimeout_w = $msg->timeout_id; |
|
|
158 | if ($$rtimeout_w) { |
|
|
159 | undef $$rtimeout_w; |
|
|
160 | delete $TRANSPORT{$transport+0} |
|
|
161 | unless --$TRANSPORT{$transport+0}[0]; |
|
|
162 | |
|
|
163 | --$BUSY; |
|
|
164 | } |
|
|
165 | |
|
|
166 | # Notify the command generator to process the response. |
|
|
167 | $msg->process_response_pdu; |
|
|
168 | }); |
279 | }); |
169 | } |
280 | } |
170 | |
281 | |
171 | #####d# timeout_id, wtf? |
|
|
172 | $msg->timeout_id (\(my $rtimeout_w = |
282 | $msg->timeout_id (\(my $rtimeout_w = |
173 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
283 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
174 | my $rtimeout_w = $msg->timeout_id; |
284 | my $rtimeout_w = $msg->timeout_id; |
175 | if ($$rtimeout_w) { |
285 | if ($$rtimeout_w) { |
176 | undef $$rtimeout_w; |
286 | undef $$rtimeout_w; |
177 | delete $TRANSPORT{$transport+0} |
287 | delete $TRANSPORT[$fileno] |
178 | unless --$TRANSPORT{$transport+0}[0]; |
288 | unless --$TRANSPORT[$fileno][0]; |
179 | } |
289 | } |
180 | |
290 | |
181 | if ($retries--) { |
291 | if ($retries--) { |
182 | _send_pdu ($pdu, $retries); |
292 | _send_pdu ($pdu, $retries); |
183 | } else { |
293 | } else { |
184 | --$BUSY; |
|
|
185 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); |
294 | $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); |
186 | $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); |
295 | $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); |
|
|
296 | |
|
|
297 | --$BUSY; |
|
|
298 | kick_job; |
187 | } |
299 | } |
188 | }) |
300 | }) |
189 | )); |
301 | )); |
190 | } else { |
302 | } else { |
191 | --$BUSY; |
303 | --$BUSY; |
|
|
304 | kick_job; |
|
|
305 | } |
|
|
306 | } |
|
|
307 | |
|
|
308 | sub kick_job { |
|
|
309 | while ($BUSY < $MAX_OUTSTANDING) { |
|
|
310 | my $pdu = shift @QUEUE |
|
|
311 | or last; |
|
|
312 | |
|
|
313 | ++$BUSY; |
|
|
314 | |
|
|
315 | _send_pdu $pdu, $pdu->retries; |
192 | } |
316 | } |
193 | } |
317 | } |
194 | |
318 | |
195 | sub send_pdu($$$) { |
319 | sub send_pdu($$$) { |
196 | my (undef, $pdu, $delay) = @_; |
320 | my (undef, $pdu, $delay) = @_; |
197 | |
321 | |
198 | ++$BUSY; |
322 | # $delay is not very sensibly implemented by AnyEvent::SNMP, |
199 | |
323 | # but apparently it is not a very sensible feature. |
200 | if ($delay > 0) { |
324 | if ($delay > 0) { |
|
|
325 | ++$BUSY; |
201 | my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { |
326 | my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { |
202 | undef $delay_w; |
327 | undef $delay_w; |
203 | _send_pdu ($pdu, $pdu->retries); |
328 | --$BUSY; |
|
|
329 | push @QUEUE, $pdu; |
|
|
330 | kick_job; |
204 | }); |
331 | }); |
205 | return 1; |
332 | return 1; |
206 | } |
333 | } |
207 | |
334 | |
208 | _send_pdu $pdu, $pdu->retries; |
335 | push @QUEUE, $pdu; |
|
|
336 | kick_job; |
|
|
337 | |
209 | 1 |
338 | 1 |
210 | } |
339 | } |
211 | |
340 | |
212 | sub activate($) { |
341 | sub activate($) { |
213 | AnyEvent->one_event while $BUSY; |
342 | AnyEvent->one_event while $BUSY; |
214 | } |
343 | } |
215 | |
344 | |
216 | sub one_event($) { |
345 | sub one_event($) { |
217 | die; |
346 | AnyEvent->one_event; |
|
|
347 | } |
|
|
348 | |
|
|
349 | sub set_max_outstanding($) { |
|
|
350 | $MAX_OUTSTANDING = $_[0]; |
|
|
351 | kick_job; |
218 | } |
352 | } |
219 | |
353 | |
220 | =head1 SEE ALSO |
354 | =head1 SEE ALSO |
221 | |
355 | |
222 | L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::EV>. |
356 | L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>. |
223 | |
357 | |
224 | =head1 AUTHOR |
358 | =head1 AUTHOR |
225 | |
359 | |
226 | Marc Lehmann <schmorp@schmorp.de> |
360 | Marc Lehmann <schmorp@schmorp.de> |
227 | http://home.schmorp.de/ |
361 | http://home.schmorp.de/ |