… | |
… | |
12 | my $pool = new AnyEvent::Fork::Pool |
12 | my $pool = new AnyEvent::Fork::Pool |
13 | "MyWorker::run", |
13 | "MyWorker::run", |
14 | |
14 | |
15 | # pool management |
15 | # pool management |
16 | min => 0, # minimum # of processes |
16 | min => 0, # minimum # of processes |
17 | max => 8, # maximum # of processes |
17 | max => 4, # maximum # of processes |
|
|
18 | max_queue => 2, # queue at most this number of jobs per process |
18 | busy_time => 0, # wait this before starting a new process |
19 | min_delay => 0, # wait this many seconds before starting a new process |
19 | max_idle => 1, # wait this before killing an idle process |
20 | min_idle => 0, # try to have at least this amount of idle processes |
20 | idle_time => 1, # at most this many idle processes |
21 | max_idle => 1, # at most this many idle processes |
|
|
22 | idle_time => 1, # wait this many seconds before killing an idle process |
|
|
23 | on_destroy => (my $finish = AE::cv), |
21 | |
24 | |
22 | # template process |
25 | # template process |
23 | template => AnyEvent::Fork->new, # the template process to use |
26 | template => AnyEvent::Fork->new, # the template process to use |
24 | require => [MyWorker::], # module(s) to load |
27 | require => [MyWorker::], # module(s) to load |
25 | eval => "# perl code to execute in template", |
28 | eval => "# perl code to execute in template", |
26 | on_destroy => (my $finish = AE::cv), |
|
|
27 | |
29 | |
28 | # parameters passed to AnyEvent::Fork::RPC |
30 | # parameters passed to AnyEvent::Fork::RPC |
29 | async => 0, |
31 | async => 0, |
30 | on_error => sub { die "FATAL: $_[0]\n" }, |
32 | on_error => sub { die "FATAL: $_[0]\n" }, |
31 | on_event => sub { my @ev = @_ }, |
33 | on_event => sub { my @ev = @_ }, |
… | |
… | |
54 | is, as it defines the actual API that needs to be implemented in the |
56 | is, as it defines the actual API that needs to be implemented in the |
55 | children. |
57 | children. |
56 | |
58 | |
57 | =head1 EXAMPLES |
59 | =head1 EXAMPLES |
58 | |
60 | |
59 | =head1 API |
61 | =head1 PARENT USAGE |
60 | |
62 | |
61 | =over 4 |
63 | =over 4 |
62 | |
64 | |
63 | =cut |
65 | =cut |
64 | |
66 | |
65 | package AnyEvent::Fork::Pool; |
67 | package AnyEvent::Fork::Pool; |
66 | |
68 | |
67 | use common::sense; |
69 | use common::sense; |
68 | |
70 | |
69 | use Guard (); |
71 | use Scalar::Util (); |
|
|
72 | |
|
|
73 | use Array::Heap (); |
70 | |
74 | |
71 | use AnyEvent; |
75 | use AnyEvent; |
72 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
76 | use AnyEvent::Fork; # we don't actually depend on it, this is for convenience |
73 | use AnyEvent::Fork::RPC; |
77 | use AnyEvent::Fork::RPC; |
74 | |
78 | |
|
|
79 | my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
|
|
80 | my $magic2 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
|
|
81 | |
75 | our $VERSION = 0.1; |
82 | our $VERSION = 0.1; |
76 | |
83 | |
77 | =item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...] |
84 | =item my $rpc = new AnyEvent::Fork::Pool $function, [key => value...] |
|
|
85 | |
|
|
86 | Creates a new pool object with the specified C<$function> as function |
|
|
87 | (name) to call for each request. |
|
|
88 | |
|
|
89 | A pool consists of a template process that contains the code and data that |
|
|
90 | the worker processes need. And a number of worker processes that have been |
|
|
91 | forked off of that template process. |
|
|
92 | |
|
|
93 | You can supply your own template process, or tell C<AnyEvent::Fork::Pool> |
|
|
94 | to create one. |
|
|
95 | |
|
|
96 | A relatively large number of key/value pairs can be specified to influence |
|
|
97 | the behaviour. They are grouped into the categories "pool management", |
|
|
98 | "template process" and "rpc parameters". |
78 | |
99 | |
79 | =over 4 |
100 | =over 4 |
80 | |
101 | |
81 | =item on_error => $cb->($msg) |
102 | =item Pool Management |
82 | |
103 | |
83 | Called on (fatal) errors, with a descriptive (hopefully) message. If |
104 | The pool consists of a certain number of worker processes. These options |
84 | this callback is not provided, but C<on_event> is, then the C<on_event> |
105 | decide how many of these processes exist and when they are started and |
85 | callback is called with the first argument being the string C<error>, |
106 | stopp.ed |
86 | followed by the error message. |
|
|
87 | |
107 | |
88 | If neither handler is provided it prints the error to STDERR and will |
108 | =over 4 |
89 | start failing badly. |
|
|
90 | |
109 | |
91 | =item on_event => $cb->(...) |
110 | =item min => $count (default: 0) |
92 | |
111 | |
93 | Called for every call to the C<AnyEvent::Fork::RPC::event> function in the |
112 | The minimum number of processes in the pool, in addition to the template |
94 | child, with the arguments of that function passed to the callback. |
113 | process. Even when idle, there will never be fewer than this number of |
|
|
114 | worker processes. The default means that the pool can be empty. |
95 | |
115 | |
96 | Also called on errors when no C<on_error> handler is provided. |
116 | =item max => $count (default: 4) |
97 | |
117 | |
98 | =item on_destroy => $cb->() |
118 | The maximum number of processes in the pool, in addition to the template |
|
|
119 | process. C<AnyEvent::Fork::Pool> will never create more than this number |
|
|
120 | of processes. |
99 | |
121 | |
100 | Called when the C<$rpc> object has been destroyed and all requests have |
122 | =item max_queue => $count (default: 2) |
101 | been successfully handled. This is useful when you queue some requests and |
|
|
102 | want the child to go away after it has handled them. The problem is that |
|
|
103 | the parent must not exit either until all requests have been handled, and |
|
|
104 | this can be accomplished by waiting for this callback. |
|
|
105 | |
123 | |
106 | =item init => $function (default none) |
124 | The maximum number of jobs sent to a single worker process. Worker |
|
|
125 | processes that handle this number of jobs already are called "busy". |
107 | |
126 | |
108 | When specified (by name), this function is called in the child as the very |
127 | Jobs that cannot be sent to a worker immediately (because all workers are |
109 | first thing when taking over the process, with all the arguments normally |
128 | busy) will be queued until a worker is available. |
110 | passed to the C<AnyEvent::Fork::run> function, except the communications |
|
|
111 | socket. |
|
|
112 | |
129 | |
113 | It can be used to do one-time things in the child such as storing passed |
130 | =item min_delay => $seconds (default: 0) |
114 | parameters or opening database connections. |
|
|
115 | |
131 | |
116 | It is called very early - before the serialisers are created or the |
132 | When a job is queued and all workers are busy, a timer is started. If the |
117 | C<$function> name is resolved into a function reference, so it could be |
133 | timer elapses and there are still jobs that cannot be queued to a worker, |
118 | used to load any modules that provide the serialiser or function. It can |
134 | a new worker is started. |
119 | not, however, create events. |
135 | |
|
|
136 | This configurs the time that all workers must be busy before a new worker |
|
|
137 | is started. Or, put differently, the minimum delay betwene starting new |
|
|
138 | workers. |
|
|
139 | |
|
|
140 | The delay is zero by default, which means new workers will be started |
|
|
141 | without delay. |
|
|
142 | |
|
|
143 | =item min_idle => $count (default: 0) |
|
|
144 | |
|
|
145 | The minimum number of idle workers - when they are less, more |
|
|
146 | are started. The C<min_delay> is still respected though, and |
|
|
147 | C<min_idle>/C<min_delay> and C<max_idle>/C<idle_time> are useful to |
|
|
148 | dynamically adjust the pool. |
|
|
149 | |
|
|
150 | =item max_idle => $count (default: 1) |
|
|
151 | |
|
|
152 | The maximum number of idle workers. If a worker becomes idle and there are |
|
|
153 | already this many idle workers, it will be stopped immediately instead of |
|
|
154 | waiting for the idle timer to elapse. |
|
|
155 | |
|
|
156 | =item idle_time => $seconds (default: 1) |
|
|
157 | |
|
|
158 | When a worker has no jobs to execute it becomes idle. An idle worker that |
|
|
159 | hasn't executed a job within this amount of time will be stopped, unless |
|
|
160 | the other parameters say otherwise. |
|
|
161 | |
|
|
162 | =item on_destroy => $callback->() (default: none) |
|
|
163 | |
|
|
164 | When a pool object goes out of scope, it will still handle all outstanding |
|
|
165 | jobs. After that, it will destroy all workers (and also the template |
|
|
166 | process if it isn't referenced otherwise). |
|
|
167 | |
|
|
168 | =back |
|
|
169 | |
|
|
170 | =item Template Process |
|
|
171 | |
|
|
172 | The worker processes are all forked from a single template |
|
|
173 | process. Ideally, all modules and all cdoe used by the worker, as well as |
|
|
174 | any shared data structures should be loaded into the template process, to |
|
|
175 | take advantage of data sharing via fork. |
|
|
176 | |
|
|
177 | You can create your own template process by creating a L<AnyEvent::Fork> |
|
|
178 | object yourself and passing it as the C<template> parameter, but |
|
|
179 | C<AnyEvent::Fork::Pool> can create one for you, including some standard |
|
|
180 | options. |
|
|
181 | |
|
|
182 | =over 4 |
|
|
183 | |
|
|
184 | =item template => $fork (default: C<< AnyEvent::Fork->new >>) |
|
|
185 | |
|
|
186 | The template process to use, if you want to create your own. |
|
|
187 | |
|
|
188 | =item require => \@modules (default: C<[]>) |
|
|
189 | |
|
|
190 | The modules in this list will be laoded into the template process. |
|
|
191 | |
|
|
192 | =item eval => "# perl code to execute in template" (default: none) |
|
|
193 | |
|
|
194 | This is a perl string that is evaluated after creating the template |
|
|
195 | process and after requiring the modules. It can do whatever it wants to |
|
|
196 | configure the process, but it must not do anything that would keep a later |
|
|
197 | fork from working (so must not create event handlers or (real) threads for |
|
|
198 | example). |
|
|
199 | |
|
|
200 | =back |
|
|
201 | |
|
|
202 | =item AnyEvent::Fork::RPC Parameters |
|
|
203 | |
|
|
204 | These parameters are all passed directly to L<AnyEvent::Fork::RPC>. They |
|
|
205 | are only briefly mentioned here, for their full documentation |
|
|
206 | please refer to the L<AnyEvent::Fork::RPC> documentation. Also, the |
|
|
207 | default values mentioned here are only documented as a best effort - |
|
|
208 | L<AnyEvent::Fork::RPC> documentation is binding. |
|
|
209 | |
|
|
210 | =over 4 |
120 | |
211 | |
121 | =item async => $boolean (default: 0) |
212 | =item async => $boolean (default: 0) |
122 | |
213 | |
123 | The default server used in the child does all I/O blockingly, and only |
214 | Whether to sue the synchronous or asynchronous RPC backend. |
124 | allows a single RPC call to execute concurrently. |
|
|
125 | |
215 | |
126 | Setting C<async> to a true value switches to another implementation that |
216 | =item on_error => $callback->($message) (default: die with message) |
127 | uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. |
|
|
128 | |
217 | |
129 | The actual API in the child is documented in the section that describes |
218 | The callback to call on any (fatal) errors. |
130 | the calling semantics of the returned C<$rpc> function. |
|
|
131 | |
219 | |
132 | If you want to pre-load the actual back-end modules to enable memory |
220 | =item on_event => $callback->(...) (default: C<sub { }>, unlike L<AnyEvent::Fork::RPC>) |
133 | sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for |
|
|
134 | synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. |
|
|
135 | |
221 | |
136 | If you use a template process and want to fork both sync and async |
222 | The callback to invoke on events. |
137 | children, then it is permissible to load both modules. |
|
|
138 | |
223 | |
139 | =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') |
224 | =item init => $initfunction (default: none) |
140 | |
225 | |
141 | All arguments, result data and event data have to be serialised to be |
226 | The function to call in the child, once before handling requests. |
142 | transferred between the processes. For this, they have to be frozen and |
|
|
143 | thawed in both parent and child processes. |
|
|
144 | |
227 | |
145 | By default, only octet strings can be passed between the processes, which |
228 | =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
146 | is reasonably fast and efficient. |
|
|
147 | |
229 | |
148 | For more complicated use cases, you can provide your own freeze and thaw |
230 | The serialiser to use. |
149 | functions, by specifying a string with perl source code. It's supposed to |
|
|
150 | return two code references when evaluated: the first receives a list of |
|
|
151 | perl values and must return an octet string. The second receives the octet |
|
|
152 | string and must return the original list of values. |
|
|
153 | |
|
|
154 | If you need an external module for serialisation, then you can either |
|
|
155 | pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> |
|
|
156 | or C<require> statement into the serialiser string. Or both. |
|
|
157 | |
231 | |
158 | =back |
232 | =back |
159 | |
233 | |
160 | See the examples section earlier in this document for some actual |
234 | =back |
161 | examples. |
|
|
162 | |
235 | |
163 | =cut |
236 | =cut |
164 | |
237 | |
165 | sub new { |
238 | sub new { |
166 | my ($self, $function, %arg) = @_; |
239 | my ($class, $function, %arg) = @_; |
167 | |
240 | |
168 | my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; |
241 | my $self = bless { |
169 | my $on_event = delete $arg{on_event}; |
242 | min => 0, |
170 | my $on_error = delete $arg{on_error}; |
243 | max => 4, |
171 | my $on_destroy = delete $arg{on_destroy}; |
244 | max_queue => 2, |
|
|
245 | min_delay => 0, |
|
|
246 | max_idle => 1, |
|
|
247 | idle_time => 1, |
|
|
248 | on_event => sub { }, |
|
|
249 | %arg, |
|
|
250 | pool => [], |
|
|
251 | queue => [], |
|
|
252 | }, $class; |
|
|
253 | |
|
|
254 | $self->{function} = $function; |
|
|
255 | |
|
|
256 | ($self->{template} ||= new AnyEvent::Fork) |
|
|
257 | ->require ("AnyEvent::Fork::RPC::" . ($self->{async} ? "Async" : "Sync")) |
|
|
258 | ->require (@{ delete $self->{require} }) |
|
|
259 | ->eval (' |
|
|
260 | my ($magic0, $magic2) = @_; |
|
|
261 | sub AnyEvent::Fork::Pool::quit() { |
|
|
262 | AnyEvent::Fork::RPC::on_event $magic0, "quit", $magic2; |
|
|
263 | } |
|
|
264 | ', $magic0, $magic2) |
|
|
265 | ->eval (delete $self->{eval}); |
|
|
266 | |
|
|
267 | $self->start |
|
|
268 | while @{ $self->{pool} } < $self->{min}; |
|
|
269 | |
|
|
270 | $self |
|
|
271 | } |
|
|
272 | |
|
|
273 | sub start { |
|
|
274 | my ($self) = @_; |
172 | |
275 | |
173 | # default for on_error is to on_event, if specified |
276 | warn "start\n";#d# |
174 | $on_error ||= $on_event |
|
|
175 | ? sub { $on_event->(error => shift) } |
|
|
176 | : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; |
|
|
177 | |
277 | |
178 | # default for on_event is to raise an error |
278 | Scalar::Util::weaken $self; |
179 | $on_event ||= sub { $on_error->("event received, but no on_event handler") }; |
|
|
180 | |
279 | |
181 | my ($f, $t) = eval $serialiser; die $@ if $@; |
280 | my $proc = [0, undef, undef]; |
182 | |
281 | |
183 | my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww); |
282 | $proc->[1] = $self->{template} |
184 | my ($rlen, $rbuf, $rw) = 512 - 16; |
283 | ->fork |
|
|
284 | ->AnyEvent::Fork::RPC::run ($self->{function}, |
|
|
285 | async => $self->{async}, |
|
|
286 | init => $self->{init}, |
|
|
287 | serialiser => $self->{serialiser}, |
|
|
288 | on_error => $self->{on_error}, |
|
|
289 | on_event => sub { |
|
|
290 | if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic2) { |
|
|
291 | if ($_[1] eq "quit") { |
|
|
292 | my $pool = $self->{pool}; |
|
|
293 | for (0 .. $#$pool) { |
|
|
294 | if ($pool->[$_] == $proc) { |
|
|
295 | Array::Heap::splice_heap @$pool, $_; |
|
|
296 | return; |
|
|
297 | } |
|
|
298 | } |
|
|
299 | die; |
|
|
300 | } |
|
|
301 | return; |
|
|
302 | } |
185 | |
303 | |
186 | my $wcb = sub { |
304 | &{ $self->{on_event} }; |
187 | my $len = syswrite $fh, $wbuf; |
|
|
188 | |
|
|
189 | unless (defined $len) { |
|
|
190 | if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
|
|
191 | undef $rw; undef $ww; # it ends here |
|
|
192 | $on_error->("$!"); |
|
|
193 | } |
305 | }, |
194 | } |
306 | ) |
195 | |
|
|
196 | substr $wbuf, 0, $len, ""; |
|
|
197 | |
|
|
198 | unless (length $wbuf) { |
|
|
199 | undef $ww; |
|
|
200 | $shutdown and shutdown $fh, 1; |
|
|
201 | } |
|
|
202 | }; |
307 | ; |
203 | |
308 | |
204 | my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); |
309 | ++$self->{idle}; |
|
|
310 | Array::Heap::push_heap @{ $self->{pool} }, $proc; |
|
|
311 | } |
205 | |
312 | |
206 | $self->require ($module) |
313 | =item $pool->call (..., $cb->(...)) |
207 | ->send_arg ($function, $arg{init}, $serialiser) |
314 | |
208 | ->run ("$module\::run", sub { |
315 | Call the RPC function of a worker with the given arguments, and when the |
|
|
316 | worker is done, call the C<$cb> with the results, like just calling the |
|
|
317 | L<AnyEvent::Fork::RPC> object directly. |
|
|
318 | |
|
|
319 | If there is no free worker, the call will be queued. |
|
|
320 | |
|
|
321 | Note that there can be considerable time between calling this method and |
|
|
322 | the call actually being executed. During this time, the parameters passed |
|
|
323 | to this function are effectively read-only - modifying them after the call |
|
|
324 | and before the callback is invoked causes undefined behaviour. |
|
|
325 | |
|
|
326 | =cut |
|
|
327 | |
|
|
328 | sub scheduler { |
209 | $fh = shift; |
329 | my $self = shift; |
210 | |
330 | |
211 | my ($id, $len); |
331 | my $pool = $self->{pool}; |
212 | $rw = AE::io $fh, 0, sub { |
332 | my $queue = $self->{queue}; |
213 | $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; |
|
|
214 | $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; |
|
|
215 | |
333 | |
216 | if ($len) { |
334 | $self->start |
217 | while (8 <= length $rbuf) { |
335 | unless @$pool; |
218 | ($id, $len) = unpack "LL", $rbuf; |
336 | |
219 | 8 + $len <= length $rbuf |
337 | while (@$queue) { |
|
|
338 | my $proc = $pool->[0]; |
|
|
339 | |
|
|
340 | if ($proc->[0] < $self->{max_queue}) { |
|
|
341 | warn "free $proc $proc->[0]\n";#d# |
|
|
342 | # found free worker |
|
|
343 | --$self->{idle} |
|
|
344 | unless $proc->[0]++; |
|
|
345 | |
|
|
346 | undef $proc->[2]; |
|
|
347 | |
|
|
348 | Array::Heap::adjust_heap @$pool, 0; |
|
|
349 | |
|
|
350 | my $job = shift @$queue; |
|
|
351 | my $ocb = pop @$job; |
|
|
352 | |
|
|
353 | $proc->[1]->(@$job, sub { |
|
|
354 | for (0 .. $#$pool) { |
|
|
355 | if ($pool->[$_] == $proc) { |
|
|
356 | # reduce queue counter |
|
|
357 | unless (--$pool->[$_][0]) { |
|
|
358 | # worker becomes idle |
|
|
359 | my $to = ++$self->{idle} > $self->{max_idle} |
|
|
360 | ? 0 |
|
|
361 | : $self->{idle_time}; |
|
|
362 | |
|
|
363 | $proc->[2] = AE::timer $to, 0, sub { |
|
|
364 | undef $proc->[2]; |
|
|
365 | |
|
|
366 | warn "destroy $proc afzer $to\n";#d# |
|
|
367 | |
|
|
368 | for (0 .. $#$pool) { |
|
|
369 | if ($pool->[$_] == $proc) { |
|
|
370 | Array::Heap::splice_heap @$pool, $_; |
|
|
371 | --$self->{idle}; |
220 | or last; |
372 | last; |
221 | |
373 | } |
222 | my @r = $t->(substr $rbuf, 8, $len); |
374 | } |
223 | substr $rbuf, 0, 8 + $len, ""; |
|
|
224 | |
|
|
225 | if ($id) { |
|
|
226 | if (@rcb) { |
|
|
227 | (shift @rcb)->(@r); |
|
|
228 | } elsif (my $cb = delete $rcb{$id}) { |
|
|
229 | $cb->(@r); |
375 | }; |
230 | } else { |
|
|
231 | undef $rw; undef $ww; |
|
|
232 | $on_error->("unexpected data from child"); |
|
|
233 | } |
376 | } |
234 | } else { |
377 | |
235 | $on_event->(@r); |
378 | Array::Heap::adjust_heap @$pool, $_; |
|
|
379 | last; |
236 | } |
380 | } |
237 | } |
381 | } |
238 | } elsif (defined $len) { |
382 | &$ocb; |
239 | undef $rw; undef $ww; # it ends here |
383 | }); |
240 | |
|
|
241 | if (@rcb || %rcb) { |
|
|
242 | $on_error->("unexpected eof"); |
|
|
243 | } else { |
384 | } else { |
244 | $on_destroy->(); |
385 | warn "busy $proc->[0]\n";#d# |
|
|
386 | # all busy, delay |
|
|
387 | |
|
|
388 | $self->{min_delay_w} ||= AE::timer $self->{min_delay}, 0, sub { |
|
|
389 | delete $self->{min_delay_w}; |
|
|
390 | |
|
|
391 | if (@{ $self->{queue} }) { |
|
|
392 | $self->start; |
|
|
393 | $self->scheduler; |
245 | } |
394 | } |
246 | } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
|
|
247 | undef $rw; undef $ww; # it ends here |
|
|
248 | $on_error->("read: $!"); |
|
|
249 | } |
395 | }; |
|
|
396 | last; |
250 | }; |
397 | } |
251 | |
|
|
252 | $ww ||= AE::io $fh, 1, $wcb; |
|
|
253 | }); |
|
|
254 | |
|
|
255 | my $guard = Guard::guard { |
|
|
256 | $shutdown = 1; |
|
|
257 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
|
|
258 | }; |
398 | } |
259 | |
399 | warn "last\n";#d# |
260 | my $id; |
|
|
261 | |
|
|
262 | $arg{async} |
|
|
263 | ? sub { |
|
|
264 | $id = ($id == 0xffffffff ? 0 : $id) + 1; |
|
|
265 | $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops |
|
|
266 | |
|
|
267 | $rcb{$id} = pop; |
|
|
268 | |
|
|
269 | $guard; # keep it alive |
|
|
270 | |
|
|
271 | $wbuf .= pack "LL/a*", $id, &$f; |
|
|
272 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
|
|
273 | } |
|
|
274 | : sub { |
|
|
275 | push @rcb, pop; |
|
|
276 | |
|
|
277 | $guard; # keep it alive |
|
|
278 | |
|
|
279 | $wbuf .= pack "L/a*", &$f; |
|
|
280 | $ww ||= $fh && AE::io $fh, 1, $wcb; |
|
|
281 | } |
|
|
282 | } |
400 | } |
283 | |
401 | |
284 | =item $pool->call (..., $cb->(...)) |
402 | sub call { |
|
|
403 | my $self = shift; |
|
|
404 | |
|
|
405 | push @{ $self->{queue} }, [@_]; |
|
|
406 | $self->scheduler; |
|
|
407 | } |
|
|
408 | |
|
|
409 | sub DESTROY { |
|
|
410 | $_[0]{on_destroy}->(); |
|
|
411 | } |
285 | |
412 | |
286 | =back |
413 | =back |
287 | |
414 | |
288 | =head1 SEE ALSO |
415 | =head1 SEE ALSO |
289 | |
416 | |