… | |
… | |
74 | and downwards. |
74 | and downwards. |
75 | |
75 | |
76 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
76 | Note that you can use L<Net::SNMP::XS> to speed up parsing of responses |
77 | considerably. |
77 | considerably. |
78 | |
78 | |
79 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<4>) |
79 | =item $AnyEvent::SNMP::MIN_RECVQUEUE (default: C<8>) |
80 | |
80 | |
81 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
81 | =item $AnyEvent::SNMP::MAX_RECVQUEUE (default: C<64>) |
82 | |
82 | |
83 | These values specify the minimum and maximum receive queue length (in |
83 | These values specify the minimum and maximum receive queue length (in |
84 | units of one response packet). |
84 | units of one response packet). |
… | |
… | |
143 | # avoid the method call |
143 | # avoid the method call |
144 | my $timer = sub { shift->timer (@_) }; |
144 | my $timer = sub { shift->timer (@_) }; |
145 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
145 | AnyEvent::post_detect { $timer = AnyEvent->can ("timer") }; |
146 | |
146 | |
147 | our $BUSY; |
147 | our $BUSY; |
148 | our %TRANSPORT; # address => [count, watcher] |
148 | our @TRANSPORT; # fileno => [count, watcher] |
149 | our @QUEUE; |
149 | our @QUEUE; |
150 | our $MAX_OUTSTANDING = 50; |
150 | our $MAX_OUTSTANDING = 50; |
151 | our $MIN_RECVQUEUE = 4; |
151 | our $MIN_RECVQUEUE = 8; |
152 | our $MAX_RECVQUEUE = 64; |
152 | our $MAX_RECVQUEUE = 64; |
153 | |
153 | |
154 | sub kick_job; |
154 | sub kick_job; |
155 | |
155 | |
156 | sub _send_pdu { |
156 | sub _send_pdu { |
… | |
… | |
192 | } |
192 | } |
193 | |
193 | |
194 | # Schedule the timeout handler if the message expects a response. |
194 | # Schedule the timeout handler if the message expects a response. |
195 | if ($pdu->expect_response) { |
195 | if ($pdu->expect_response) { |
196 | my $transport = $msg->transport; |
196 | my $transport = $msg->transport; |
|
|
197 | my $fileno = $transport->fileno; |
197 | |
198 | |
198 | # register the transport |
199 | # register the transport |
199 | unless ($TRANSPORT{$transport+0}[0]++) { |
200 | unless ($TRANSPORT[$fileno][0]++) { |
200 | $TRANSPORT{$transport+0}[1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
201 | $TRANSPORT[$fileno][1] = AnyEvent->io (fh => $transport->socket, poll => 'r', cb => sub { |
201 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
202 | for my $count (1..$MAX_RECVQUEUE) { # handle up to this many requests in one go |
202 | # Create a new Message object to receive the response |
203 | # Create a new Message object to receive the response |
203 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
204 | my ($msg, $error) = Net::SNMP::Message->new (-transport => $transport); |
204 | |
205 | |
205 | if (!defined $msg) { |
206 | if (!defined $msg) { |
… | |
… | |
207 | } |
208 | } |
208 | |
209 | |
209 | # Read the message from the Transport Layer |
210 | # Read the message from the Transport Layer |
210 | if (!defined $msg->recv) { |
211 | if (!defined $msg->recv) { |
211 | if ($transport->connectionless) { |
212 | if ($transport->connectionless) { |
|
|
213 | # if we handled very few replies and we have queued work, try |
|
|
214 | # to increase the parallelity as we probably can handle more. |
212 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
215 | if ($count < $MIN_RECVQUEUE && @QUEUE) { |
213 | ++$MAX_OUTSTANDING; |
216 | ++$MAX_OUTSTANDING; |
214 | kick_job; |
217 | kick_job; |
215 | } |
218 | } |
216 | } else { |
219 | } else { |
217 | # for some reason, connected-oriented transports seem to need this |
220 | # for some reason, connected-oriented transports seem to need this |
218 | delete $TRANSPORT{$transport+0} |
221 | delete $TRANSPORT[$fileno] |
219 | unless --$TRANSPORT{$transport+0}[0]; |
222 | unless --$TRANSPORT[$fileno][0]; |
220 | } |
223 | } |
221 | |
224 | |
222 | $msg->error; |
225 | $msg->error; |
223 | return; |
226 | return; |
224 | } |
227 | } |
… | |
… | |
247 | undef $$rtimeout_w; |
250 | undef $$rtimeout_w; |
248 | |
251 | |
249 | --$BUSY; |
252 | --$BUSY; |
250 | kick_job; |
253 | kick_job; |
251 | |
254 | |
252 | unless (--$TRANSPORT{$transport+0}[0]) { |
255 | unless (--$TRANSPORT[$fileno][0]) { |
253 | delete $TRANSPORT{$transport+0}; |
256 | delete $TRANSPORT[$fileno]; |
254 | return; |
257 | return; |
255 | } |
258 | } |
256 | } |
259 | } |
257 | } |
260 | } |
258 | |
261 | |
|
|
262 | # when we end up here, we successfully handled $MAX_RECVQUEUE |
|
|
263 | # replies in one iteration, so assume we are overloaded |
|
|
264 | # and reduce the amount of parallelity. |
259 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.9) || 1; |
265 | $MAX_OUTSTANDING = (int $MAX_OUTSTANDING * 0.95) || 1; |
260 | }); |
266 | }); |
261 | } |
267 | } |
262 | |
268 | |
263 | $msg->timeout_id (\(my $rtimeout_w = |
269 | $msg->timeout_id (\(my $rtimeout_w = |
264 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
270 | AnyEvent->$timer (after => $pdu->timeout, cb => sub { |
265 | my $rtimeout_w = $msg->timeout_id; |
271 | my $rtimeout_w = $msg->timeout_id; |
266 | if ($$rtimeout_w) { |
272 | if ($$rtimeout_w) { |
267 | undef $$rtimeout_w; |
273 | undef $$rtimeout_w; |
268 | delete $TRANSPORT{$transport+0} |
274 | delete $TRANSPORT[$fileno] |
269 | unless --$TRANSPORT{$transport+0}[0]; |
275 | unless --$TRANSPORT[$fileno][0]; |
270 | } |
276 | } |
271 | |
277 | |
272 | if ($retries--) { |
278 | if ($retries--) { |
273 | _send_pdu ($pdu, $retries); |
279 | _send_pdu ($pdu, $retries); |
274 | } else { |
280 | } else { |