ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-SNMP/SNMP.pm
(Generate patch)

Comparing AnyEvent-SNMP/SNMP.pm (file contents):
Revision 1.1 by root, Tue Mar 31 21:55:18 2009 UTC vs.
Revision 1.5 by root, Wed Apr 22 12:31:20 2009 UTC

43This module does not export anything and does not require you to do 43This module does not export anything and does not require you to do
44anything special apart from loading it I<before doing any non-blocking 44anything special apart from loading it I<before doing any non-blocking
45requests with Net::SNMP>. It is recommended but not required to load this 45requests with Net::SNMP>. It is recommended but not required to load this
46module before C<Net::SNMP>. 46module before C<Net::SNMP>.
47 47
48=head1 GLOBAL VARIABLES
49
50=over 4
51
52=item $AnyEvent::SNMP::MAX_OUTSTANDING (default: C<50>, dynamic)
53
54Use this package variable to restrict the number of outstanding SNMP
55requests at any point in time.
56
57Net::SNMP is very fast at creating and sending SNMP requests, but much
58slower at parsing (big, bulk) responses. This makes it easy to request a
59lot of data that can take many seconds to parse.
60
61In the best case, this can lead to unnecessary delays (and even time-outs,
62as the data has been received but not yet processed) and in the worst
63case, this can lead to packet loss, when the receive queue overflows and
64the kernel can no longer accept new packets.
65
66To avoid this, you can (and should) limit the number of outstanding requests
67to a number low enough so that parsing time doesn't introduce noticable delays.
68
69Unfortunately, this number depends not only on processing speed and load
70of the machine running Net::SNMP, but also on the network latency and the
71speed of your SNMP agents.
72
73AnyEvent::SNMP tries to dynamically adjust this number dynamically upwards
74and downwards.
75
76Note that you can use L<Net::SNMP::XS> to speed up parsing of responses
77considerably.
78
79=item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>)
80
81=item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>)
82
83These values specify the minimum and maximum receive queue length (in
84units of one response packet).
85
86When AnyEvent::SNMP handles $MAX_RECVQUEUE or more packets per iteration
87it will reduce $MAX_OUTSTANDING. If it handles less than $MIN_RECVQUEUE,
88it increases $MAX_OUTSTANDING.
89
90This has the result of adjusting the number of outstanding requests so that
91the recv queue is between the minimum and maximu, usually.
92
93This algorithm works reasonably well as long as the responses, response
94latencies and processing times are the same size per packet on average.
95
96=back
97
98=head1 COMPATIBILITY
99
100This module may be used as a drop in replacement for the
101Net::SNMP::Dispatcher in existing programs. You can still call
102C<snmp_dispatcher> to start the event-loop, but then you loose the benefit
103of mixing Net::SNMP events with other events.
104
105 use AnyEvent::SNMP;
106 use Net::SNMP;
107
108 # just use Net::SNMP as before
109
110 # ... start non-blocking snmp request(s)...
111 Net::SNMP->session (
112 -hostname => "127.0.0.1",
113 -community => "public",
114 -nonblocking => 1,
115 )->get_request (-callback => sub { ... });
116
117 snmp_dispatcher;
118
48=cut 119=cut
49 120
50package AnyEvent::SNMP; 121package AnyEvent::SNMP;
51 122
52no warnings; 123no warnings;
61} 132}
62 133
63use Net::SNMP (); 134use Net::SNMP ();
64use AnyEvent (); 135use AnyEvent ();
65 136
66our $VERSION = '0.1'; 137our $VERSION = '0.2';
67 138
68$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher; 139$Net::SNMP::DISPATCHER = instance Net::SNMP::Dispatcher;
69 140
70our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; 141our $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING;
71 142
72# avoid the method call 143# avoid the method call
73my $timer = sub { shift->timer (@_) }; 144my $timer = sub { shift->timer (@_) };
74AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; 145AnyEvent::post_detect { $timer = AnyEvent->can ("timer") };
75 146
76our $BUSY; 147our $BUSY;
77our %TRANSPORT; # address => [count, watcher] 148our @TRANSPORT; # fileno => [count, watcher]
149our @QUEUE;
150our $MAX_OUTSTANDING = 50;
151our $MIN_RECVQUEUE = 8;
152our $MAX_RECVQUEUE = 64;
153
154sub kick_job;
78 155
79sub _send_pdu { 156sub _send_pdu {
80 my ($pdu, $retries) = @_; 157 my ($pdu, $retries) = @_;
81 158
82 # mostly copied from Net::SNMP::Dispatch 159 # mostly copied from Net::SNMP::Dispatch
85 # create the new outgoing message. 162 # create the new outgoing message.
86 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu); 163 my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg ($pdu);
87 164
88 if (!defined $msg) { 165 if (!defined $msg) {
89 --$BUSY; 166 --$BUSY;
167 kick_job;
90 # Inform the command generator about the Message Processing error. 168 # Inform the command generator about the Message Processing error.
91 $pdu->status_information ($MESSAGE_PROCESSING->error); 169 $pdu->status_information ($MESSAGE_PROCESSING->error);
92 return; 170 return;
93 } 171 }
94 172
103 undef $retry_w; 181 undef $retry_w;
104 _send_pdu ($pdu, $retries); 182 _send_pdu ($pdu, $retries);
105 }); 183 });
106 } else { 184 } else {
107 --$BUSY; 185 --$BUSY;
186 kick_job;
108 } 187 }
109 188
110 # Inform the command generator about the send() error. 189 # Inform the command generator about the send() error.
111 $pdu->status_information ($msg->error); 190 $pdu->status_information ($msg->error);
112 return; 191 return;
113 } 192 }
114 193
115 # Schedule the timeout handler if the message expects a response. 194 # Schedule the timeout handler if the message expects a response.
116 if ($pdu->expect_response) { 195 if ($pdu->expect_response) {
117 my $transport = $msg->transport; 196 my $transport = $msg->transport;
197 my $fileno = $transport->fileno;
118 198
119 # register the transport 199 # register the transport
120 unless ($TRANSPORT{$transport+0}[0]++) { 200 unless ($TRANSPORT[$fileno][0]++) {
121 $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { 201 $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub {
202 for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go
122 # Create a new Message object to receive the response 203 # Create a new Message object to receive the response
123 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); 204 my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport);
124 205
125 if (!defined $msg) { 206 if (!defined $msg) {
126 die sprintf 'Failed to create Message object [%s]', $error; 207 die sprintf 'Failed to create Message object [%s]', $error;
208 }
209
210 # Read the message from the Transport Layer
211 if (!defined $msg->recv) {
212 if ($transport->connectionless) {
213 # if we handled very few replies and we have queued work, try
214 # to increase the parallelity as we probably can handle more.
215 if ($count < $MIN_RECVQUEUE && @QUEUE) {
216 ++$MAX_OUTSTANDING;
217 kick_job;
218 }
219 } else {
220 # for some reason, connected-oriented transports seem to need this
221 delete $TRANSPORT[$fileno]
222 unless --$TRANSPORT[$fileno][0];
223 }
224
225 $msg->error;
226 return;
227 }
228
229 # For connection-oriented Transport Domains, it is possible to
230 # "recv" an empty buffer if reassembly is required.
231 if (!$msg->length) {
232 return;
233 }
234
235 # Hand the message over to Message Processing.
236 if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) {
237 $MESSAGE_PROCESSING->error;
238 return;
239 }
240
241 # Set the error if applicable.
242 $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error;
243
244 # Notify the command generator to process the response.
245 $msg->process_response_pdu;
246
247 # Cancel the timeout.
248 my $rtimeout_w = $msg->timeout_id;
249 if ($$rtimeout_w) {
250 undef $$rtimeout_w;
251
252 --$BUSY;
253 kick_job;
254
255 unless (--$TRANSPORT[$fileno][0]) {
256 delete $TRANSPORT[$fileno];
257 return;
258 }
259 }
127 } 260 }
128 261
129 # Read the message from the Transport Layer 262 # when we end up here, we successfully handled $MAX_RECVQUEUE
130 if (!defined $msg->recv) { 263 # replies in one iteration, so assume we are overloaded
131 # for some reason, connected-oriented transports seem to need this 264 # and reduce the amount of parallelity.
132 unless ($transport->connectionless) { 265 $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1;
133 delete $TRANSPORT{$transport+0}
134 unless --$TRANSPORT{$transport+0}[0];
135 }
136
137 $msg->error;
138 return;
139 }
140
141 # For connection-oriented Transport Domains, it is possible to
142 # "recv" an empty buffer if reassembly is required.
143 if (!$msg->length) {
144 return;
145 }
146
147 # Hand the message over to Message Processing.
148 if (!defined $MESSAGE_PROCESSING->prepare_data_elements ($msg)) {
149 $MESSAGE_PROCESSING->error;
150 return;
151 }
152
153 # Set the error if applicable.
154 $msg->error ($MESSAGE_PROCESSING->error) if $MESSAGE_PROCESSING->error;
155
156 # Cancel the timeout.
157 my $rtimeout_w = $msg->timeout_id;
158 if ($$rtimeout_w) {
159 undef $$rtimeout_w;
160 delete $TRANSPORT{$transport+0}
161 unless --$TRANSPORT{$transport+0}[0];
162
163 --$BUSY;
164 }
165
166 # Notify the command generator to process the response.
167 $msg->process_response_pdu;
168 }); 266 });
169 } 267 }
170 268
171 #####d# timeout_id, wtf?
172 $msg->timeout_id (\(my $rtimeout_w = 269 $msg->timeout_id (\(my $rtimeout_w =
173 AnyEvent->$timer (after => $pdu->timeout, cb => sub { 270 AnyEvent->$timer (after => $pdu->timeout, cb => sub {
174 my $rtimeout_w = $msg->timeout_id; 271 my $rtimeout_w = $msg->timeout_id;
175 if ($$rtimeout_w) { 272 if ($$rtimeout_w) {
176 undef $$rtimeout_w; 273 undef $$rtimeout_w;
177 delete $TRANSPORT{$transport+0} 274 delete $TRANSPORT[$fileno]
178 unless --$TRANSPORT{$transport+0}[0]; 275 unless --$TRANSPORT[$fileno][0];
179 } 276 }
180 277
181 if ($retries--) { 278 if ($retries--) {
182 _send_pdu ($pdu, $retries); 279 _send_pdu ($pdu, $retries);
183 } else { 280 } else {
184 --$BUSY;
185 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id); 281 $MESSAGE_PROCESSING->msg_handle_delete ($pdu->msg_id);
186 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname); 282 $pdu->status_information ("No response from remote host '%s'", $pdu->hostname);
283
284 --$BUSY;
285 kick_job;
187 } 286 }
188 }) 287 })
189 )); 288 ));
190 } else { 289 } else {
191 --$BUSY; 290 --$BUSY;
291 kick_job;
192 } 292 }
193} 293}
194 294
295sub kick_job {
296 while ($BUSY < $MAX_OUTSTANDING) {
297 my $pdu = shift @QUEUE
298 or last;
299
300 ++$BUSY;
301
302 _send_pdu $pdu, $pdu->retries;
303 }
304}
195sub send_pdu($$$) { 305sub send_pdu($$$) {
196 my (undef, $pdu, $delay) = @_; 306 my (undef, $pdu, $delay) = @_;
197 307
198 ++$BUSY; 308 # $delay is not very sensibly implemented by AnyEvent::SNMP,
199 309 # but apparently it is not a very sensible feature.
200 if ($delay > 0) { 310 if ($delay > 0) {
311 ++$BUSY;
201 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub { 312 my $delay_w; $delay_w = AnyEvent->$timer (after => $delay, cb => sub {
202 undef $delay_w; 313 undef $delay_w;
203 _send_pdu ($pdu, $pdu->retries); 314 --$BUSY;
315 push @QUEUE, $pdu;
316 kick_job;
204 }); 317 });
205 return 1; 318 return 1;
206 } 319 }
207 320
208 _send_pdu $pdu, $pdu->retries; 321 push @QUEUE, $pdu;
322 kick_job;
323
209 1 324 1
210} 325}
211 326
212sub activate($) { 327sub activate($) {
213 AnyEvent->one_event while $BUSY; 328 AnyEvent->one_event while $BUSY;
214} 329}
215 330
216sub one_event($) { 331sub one_event($) {
217 die; 332 AnyEvent->one_event;
218} 333}
219 334
220=head1 SEE ALSO 335=head1 SEE ALSO
221 336
222L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::EV>. 337L<AnyEvent>, L<Net::SNMP>, L<Net::SNMP::XS>, L<Net::SNMP::EV>.
223 338
224=head1 AUTHOR 339=head1 AUTHOR
225 340
226 Marc Lehmann <schmorp@schmorp.de> 341 Marc Lehmann <schmorp@schmorp.de>
227 http://home.schmorp.de/ 342 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines