ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.2 by root, Fri Apr 10 06:50:16 2009 UTC vs.
Revision 1.8 by root, Wed Jan 6 10:25:54 2010 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::SNMP - adaptor to integrate Net::SNMP into Anyevent. 3AnyEvent::SNMP - adaptor to integrate Net::SNMP into AnyEvent.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::SNMP; 7 use AnyEvent::SNMP;
8 use Net::SNMP; 8 use Net::SNMP;
43This module does not export anything and does not require you to do 43This module does not export anything and does not require you to do
44anything special apart from loading it I<before doing any non-blocking 44anything special apart from loading it I<before doing any non-blocking
45requests with Net::SNMP>. It is recommended but not required to load this 45requests with Net::SNMP>. It is recommended but not required to load this
46module before C<Net::SNMP>. 46module before C<Net::SNMP>.
47 47
48=head1 GLOBAL VARIABLES
49
50=over 4
51
52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic)
53
54=item AnyEvent::SNMP::set_max_outstanding $new_value
55
56Use this package variable to restrict the number of outstanding SNMP
57requests at any point in time.
58
59Net::SNMP is very fast at creating and sending SNMP requests, but much
60slower at parsing (big, bulk) responses. This makes it easy to request a
61lot of data that can take many seconds to parse.
62
63In the best case, this can lead to unnecessary delays (and even time-outs,
64as the data has been received but not yet processed) and in the worst
65case, this can lead to packet loss, when the receive queue overflows and
66the kernel can no longer accept new packets.
67
68To avoid this, you can (and should) limit the number of outstanding
69requests to a number low enough so that parsing time doesn't introduce
70noticable delays.
71
72Unfortunately, this number depends not only on processing speed and load
73of the machine running Net::SNMP, but also on the network latency and the
74speed of your SNMP agents.
75
76AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
77and downwards.
78
79Increasing C<$MAX_OUTSTANDING> will not automatically use the
80extra request slots. To increase C<$MAX_OUTSTANDING> and make
81C<AnyEvent::SNMP> make use of the extra paralellity, call
82C<AnyEvent::SNMP::set_max_outstanding> with the new value, e.g.:
83
84 AnyEvent::SNMP::set_max_outstanding 500;
85
86Although due to the dynamic adjustment, this might have little lasting
87effect.
88
89Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
90considerably.
91
92=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
93
94=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>)
95
96These values specify the minimum and maximum receive queue length (in
97units of one response packet).
98
99When AnyEvent::SNMP handles $MAX_RECVQUEUE or more packets per iteration
100it will reduce $MAX_OUTSTANDING. If it handles less than $MIN_RECVQUEUE,
101it increases $MAX_OUTSTANDING.
102
103This has the result of adjusting the number of outstanding requests so that
104the recv queue is between the minimum and maximu, usually.
105
106This algorithm works reasonably well as long as the responses, response
107latencies and processing times are the same size per packet on average.
108
109=back
110
111=head1 COMPATIBILITY
112
113This module may be used as a drop in replacement for the
114Net::SNMP::Dispatcher in existing programs. You can still call
115C<snmp_dispatcher> to start the event-loop, but then you loose the benefit
116of mixing Net::SNMP events with other events.
117
118 use AnyEvent::SNMP;
119 use Net::SNMP;
120
121 # just use Net::SNMP as before
122
123 # ... start non-blocking snmp request(s)...
124 Net::SNMP->session (
125 -hostname => "127.0.0.1",
126 -community => "public",
127 -nonblocking => 1,
128 )->get_request (-callback => sub { ... });
129
130 snmp_dispatcher;
131
48=cut 132=cut
49 133
50package AnyEvent::SNMP; 134package AnyEvent::SNMP;
51 135
52no warnings; 136no warnings;
61} 145}
62 146
63use Net::SNMP (); 147use Net::SNMP ();
64use AnyEvent (); 148use AnyEvent ();
65 149
66our $VERSION = '0.11'; 150our $VERSION = '1.0';
67 151
68$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 152$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
69 153
70our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 154our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
71 155
72# avoid the method call
73my $timer = sub { shift->timer (@_) };
74AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
75
76our $BUSY; 156our $BUSY;
157our $DONE; # finished all jobs
77our %TRANSPORT; # address => [count, watcher] 158our @TRANSPORT; # fileno => [count, watcher]
159our @QUEUE;
160our $MAX_OUTSTANDING = 50;
161our $MIN_RECVQUEUE = 8;
162our $MAX_RECVQUEUE = 64;
163
164sub kick_job; # also --$BUSY
78 165
79sub _send_pdu { 166sub _send_pdu {
80 my ($pdu, $retries) = @_; 167 my ($pdu, $retries) = @_;
81 168
82 # mostly copied from Net::SNMP::Dispatch 169 # mostly copied from Net::SNMP::Dispatch
84 # Pass the PDU to Message Processing so that it can 171 # Pass the PDU to Message Processing so that it can
85 # create the new outgoing message. 172 # create the new outgoing message.
86 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); 173 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
87 174
88 if (!defined $msg) { 175 if (!defined $msg) {
89 --$BUSY; 176 kick_job;
90 # Inform the command generator about the Message Processing error. 177 # Inform the command generator about the Message Processing error.
91 $pdu->status_information ($MESSAGE_PROCESSING->error); 178 $pdu->status_information ($MESSAGE_PROCESSING->error);
92 return; 179 return;
93 } 180 }
94 181
97 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id) 184 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id)
98 if $pdu->expect_response; 185 if $pdu->expect_response;
99 186
100 # A crude attempt to recover from temporary failures. 187 # A crude attempt to recover from temporary failures.
101 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) { 188 if ($retries-- > 0 && ($!{EAGAIN} || $!{EWOULDBLOCK} || $!{ENOSPC})) {
102 my $retry_w; $retry_w = AnyEvent->$timer (after => $pdu->timeout, cb => sub { 189 my $retry_w; $retry_w = AE::timer $pdu->timeout, 0, sub {
103 undef $retry_w; 190 undef $retry_w;
104 _send_pdu ($pdu, $retries); 191 _send_pdu ($pdu, $retries);
105 }); 192 };
106 } else { 193 } else {
107 --$BUSY; 194 kick_job;
108 } 195 }
109 196
110 # Inform the command generator about the send() error. 197 # Inform the command generator about the send() error.
111 $pdu->status_information ($msg->error); 198 $pdu->status_information ($msg->error);
112 return; 199 return;
113 } 200 }
114 201
115 # Schedule the timeout handler if the message expects a response. 202 # Schedule the timeout handler if the message expects a response.
116 if ($pdu->expect_response) { 203 if ($pdu->expect_response) {
117 my $transport = $msg->transport; 204 my $transport = $msg->transport;
205 my $fileno = $transport->fileno;
118 206
119 # register the transport 207 # register the transport
120 unless ($TRANSPORT{$transport+0}[0]++) { 208 unless ($TRANSPORT[$fileno][0]++) {
121 $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 209 $TRANSPORT[$fileno][1] = AE::io $transport->socket, 0, sub {
210 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
122 # Create a new Message object to receive the response 211 # Create a new Message object to receive the response
123 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 212 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
124 213
125 if (!defined $msg) { 214 if (!defined $msg) {
126 die sprintf 'Failed to create Message object [%s]', $error; 215 die sprintf 'Failed to create Message object [%s]', $error;
216 }
217
218 # Read the message from the Transport Layer
219 if (!defined $msg->recv) {
220 if ($transport->connectionless) {
221 # if we handled very few replies and we have queued work, try
222 # to increase the parallelity as we probably can handle more.
223 if ($count < $MIN_RECVQUEUE && @QUEUE) {
224 ++$MAX_OUTSTANDING;
225 kick_job;
226 }
227 } else {
228 # for some reason, connected-oriented transports seem to need this
229 delete $TRANSPORT[$fileno]
230 unless --$TRANSPORT[$fileno][0];
231 }
232
233 $msg->error;
234 return;
235 }
236
237 # For connection-oriented Transport Domains, it is possible to
238 # "recv" an empty buffer if reassembly is required.
239 if (!$msg->length) {
240 return;
241 }
242
243 # Hand the message over to Message Processing.
244 if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) {
245 $MESSAGE_PROCESSING->error;
246 return;
247 }
248
249 # Set the error if applicable.
250 $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error;
251
252 # Notify the command generator to process the response.
253 $msg->process_response_pdu;
254
255 # Cancel the timeout.
256 my $rtimeout_w = $msg->timeout_id;
257 if ($$rtimeout_w) {
258 undef $$rtimeout_w;
259
260 kick_job;
261
262 unless (--$TRANSPORT[$fileno][0]) {
263 delete $TRANSPORT[$fileno];
264 return;
265 }
266 }
127 } 267 }
128 268
129 # Read the message from the Transport Layer 269 # when we end up here, we successfully handled $MAX_RECVQUEUE
130 if (!defined $msg->recv) { 270 # replies in one iteration, so assume we are overloaded
131 # for some reason, connected-oriented transports seem to need this 271 # and reduce the amount of parallelity.
132 unless ($transport->connectionless) { 272 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
133 delete $TRANSPORT{$transport+0}
134 unless --$TRANSPORT{$transport+0}[0];
135 }
136
137 $msg->error;
138 return;
139 } 273 };
274 }
140 275
141 # For connection-oriented Transport Domains, it is possible to 276 $msg->timeout_id (\(my $rtimeout_w =
142 # "recv" an empty buffer if reassembly is required. 277 AE::timer $pdu->timeout, 0, sub {
143 if (!$msg->length) {
144 return;
145 }
146
147 # Hand the message over to Message Processing.
148 if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) {
149 $MESSAGE_PROCESSING->error;
150 return;
151 }
152
153 # Set the error if applicable.
154 $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error;
155
156 # Cancel the timeout.
157 my $rtimeout_w = $msg->timeout_id; 278 my $rtimeout_w = $msg->timeout_id;
158 if ($$rtimeout_w) { 279 if ($$rtimeout_w) {
159 undef $$rtimeout_w; 280 undef $$rtimeout_w;
160 delete $TRANSPORT{$transport+0} 281 delete $TRANSPORT[$fileno]
161 unless --$TRANSPORT{$transport+0}[0]; 282 unless --$TRANSPORT[$fileno][0];
162
163 --$BUSY;
164 }
165
166 # Notify the command generator to process the response.
167 $msg->process_response_pdu;
168 });
169 }
170
171 #####d# timeout_id, wtf?
172 $msg->timeout_id (\(my $rtimeout_w =
173 AnyEvent->$timer (after => $pdu->timeout, cb => sub {
174 my $rtimeout_w = $msg->timeout_id;
175 if ($$rtimeout_w) {
176 undef $$rtimeout_w;
177 delete $TRANSPORT{$transport+0}
178 unless --$TRANSPORT{$transport+0}[0];
179 } 283 }
180 284
181 if ($retries--) { 285 if ($retries--) {
182 _send_pdu ($pdu, $retries); 286 _send_pdu ($pdu, $retries);
183 } else { 287 } else {
184 --$BUSY;
185 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); 288 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
186 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); 289 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
290
291 kick_job;
187 } 292 }
188 }) 293 })
189 )); 294 );
190 } else { 295 } else {
296 kick_job;
297 }
298}
299
300sub kick_job {
191 --$BUSY; 301 --$BUSY;
302
303 while ($BUSY < $MAX_OUTSTANDING) {
304 my $pdu = shift @QUEUE
305 or last;
306
307 ++$BUSY;
308
309 _send_pdu $pdu, $pdu->retries;
192 } 310 }
311
312 $DONE and $DONE->() unless $BUSY;
193} 313}
194 314
195sub send_pdu($$$) { 315sub send_pdu($$$) {
196 my (undef, $pdu, $delay) = @_; 316 my (undef, $pdu, $delay) = @_;
197 317
198 ++$BUSY; 318 # $delay is not very sensibly implemented by AnyEvent::SNMP,
199 319 # but apparently it is not a very sensible feature.
200 if ($delay > 0) { 320 if ($delay > 0) {
321 ++$BUSY;
201 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 322 my $delay_w; $delay_w = AE::timer $delay, 0, sub {
202 undef $delay_w; 323 undef $delay_w;
203 _send_pdu ($pdu, $pdu->retries); 324 push @QUEUE, $pdu;
325 kick_job;
204 }); 326 };
205 return 1; 327 return 1;
206 } 328 }
207 329
208 _send_pdu $pdu, $pdu->retries; 330 push @QUEUE, $pdu;
331 kick_job;
332
209 1 333 1
210} 334}
211 335
212sub activate($) { 336sub activate($) {
213 AnyEvent->one_event while $BUSY; 337 while ($BUSY) {
338 $DONE = AE::cv;
339 $DONE->recv;
340 undef $DONE;
341 }
214} 342}
215 343
216sub one_event($) { 344sub one_event($) {
217 die; 345 AnyEvent->one_event; #d# todo
346}
347
348sub set_max_outstanding($) {
349 $MAX_OUTSTANDING = $_[0];
350
351 ++$BUSY; # kick_job decrements $BUSY
352 kick_job;
218} 353}
219 354
220=head1 SEE ALSO 355=head1 SEE ALSO
221 356
222L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::EV>. 357L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.
223 358
224=head1 AUTHOR 359=head1 AUTHOR
225 360
226 Marc Lehmann <schmorp@schmorp.de> 361 Marc Lehmann <schmorp@schmorp.de>
227 http://home.schmorp.de/ 362 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines