ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Pool/Pool.pm
Revision: 1.1
Committed: Thu Apr 18 14:24:01 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent;
8     use AnyEvent::Fork::Pool;
9     # use AnyEvent::Fork is not needed
10    
11     # all parameters with default values
12     my $pool = new AnyEvent::Fork::Pool
13     "MyWorker::run",
14    
15     # pool management
16     min => 0, # minimum # of processes
17     max => 8, # maximum # of processes
18     busy_time => 0, # wait this before starting a new process
19     max_idle => 1, # wait this before killing an idle process
20     idle_time => 1, # at most this many idle processes
21    
22     # template process
23     template => AnyEvent::Fork->new, # the template process to use
24     require => [MyWorker::], # module(s) to load
25     eval => "# perl code to execute in template",
26     on_destroy => (my $finish = AE::cv),
27    
28     # parameters passed to AnyEvent::Fork::RPC
29     async => 0,
30     on_error => sub { die "FATAL: $_[0]\n" },
31     on_event => sub { my @ev = @_ },
32     init => "MyWorker::init",
33     serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34     ;
35    
36     for (1..10) {
37     $pool->call (doit => $_, sub {
38     print "MyWorker::run returned @_\n";
39     });
40     }
41    
42     undef $pool;
43    
44     $finish->recv;
45    
46     =head1 DESCRIPTION
47    
48     This module uses processes created via L<AnyEvent::Fork> and the RPC
49     protocol implement in L<AnyEvent::Fork::RPC> to create a load-balanced
50     pool of processes that handles jobs.
51    
52     Understanding of L<AnyEvent::Fork> is helpful but not critical to be able
53     to use this module, but a thorough understanding of L<AnyEvent::Fork::RPC>
54     is, as it defines the actual API that needs to be implemented in the
55     children.
56    
57     =head1 EXAMPLES
58    
59     =head1 API
60    
61     =over 4
62    
63     =cut
64    
65     package AnyEvent::Fork::Pool;
66    
67     use common::sense;
68    
69     use Guard ();
70    
71     use AnyEvent;
72     use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
73     use AnyEvent::Fork::RPC;
74    
75     our $VERSION = 0.1;
76    
77     =item my $rpc = new AnyEvent::Fork::RPC::pool $function, [key => value...]
78    
79     =over 4
80    
81     =item on_error => $cb->($msg)
82    
83     Called on (fatal) errors, with a descriptive (hopefully) message. If
84     this callback is not provided, but C<on_event> is, then the C<on_event>
85     callback is called with the first argument being the string C<error>,
86     followed by the error message.
87    
88     If neither handler is provided it prints the error to STDERR and will
89     start failing badly.
90    
91     =item on_event => $cb->(...)
92    
93     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
94     child, with the arguments of that function passed to the callback.
95    
96     Also called on errors when no C<on_error> handler is provided.
97    
98     =item on_destroy => $cb->()
99    
100     Called when the C<$rpc> object has been destroyed and all requests have
101     been successfully handled. This is useful when you queue some requests and
102     want the child to go away after it has handled them. The problem is that
103     the parent must not exit either until all requests have been handled, and
104     this can be accomplished by waiting for this callback.
105    
106     =item init => $function (default none)
107    
108     When specified (by name), this function is called in the child as the very
109     first thing when taking over the process, with all the arguments normally
110     passed to the C<AnyEvent::Fork::run> function, except the communications
111     socket.
112    
113     It can be used to do one-time things in the child such as storing passed
114     parameters or opening database connections.
115    
116     It is called very early - before the serialisers are created or the
117     C<$function> name is resolved into a function reference, so it could be
118     used to load any modules that provide the serialiser or function. It can
119     not, however, create events.
120    
121     =item async => $boolean (default: 0)
122    
123     The default server used in the child does all I/O blockingly, and only
124     allows a single RPC call to execute concurrently.
125    
126     Setting C<async> to a true value switches to another implementation that
127     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
128    
129     The actual API in the child is documented in the section that describes
130     the calling semantics of the returned C<$rpc> function.
131    
132     If you want to pre-load the actual back-end modules to enable memory
133     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
134     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
135    
136     If you use a template process and want to fork both sync and async
137     children, then it is permissible to load both modules.
138    
139     =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
140    
141     All arguments, result data and event data have to be serialised to be
142     transferred between the processes. For this, they have to be frozen and
143     thawed in both parent and child processes.
144    
145     By default, only octet strings can be passed between the processes, which
146     is reasonably fast and efficient.
147    
148     For more complicated use cases, you can provide your own freeze and thaw
149     functions, by specifying a string with perl source code. It's supposed to
150     return two code references when evaluated: the first receives a list of
151     perl values and must return an octet string. The second receives the octet
152     string and must return the original list of values.
153    
154     If you need an external module for serialisation, then you can either
155     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
156     or C<require> statement into the serialiser string. Or both.
157    
158     =back
159    
160     See the examples section earlier in this document for some actual
161     examples.
162    
163     =cut
164    
165     sub new {
166     my ($self, $function, %arg) = @_;
167    
168     my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
169     my $on_event = delete $arg{on_event};
170     my $on_error = delete $arg{on_error};
171     my $on_destroy = delete $arg{on_destroy};
172    
173     # default for on_error is to on_event, if specified
174     $on_error ||= $on_event
175     ? sub { $on_event->(error => shift) }
176     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
177    
178     # default for on_event is to raise an error
179     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
180    
181     my ($f, $t) = eval $serialiser; die $@ if $@;
182    
183     my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
184     my ($rlen, $rbuf, $rw) = 512 - 16;
185    
186     my $wcb = sub {
187     my $len = syswrite $fh, $wbuf;
188    
189     unless (defined $len) {
190     if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
191     undef $rw; undef $ww; # it ends here
192     $on_error->("$!");
193     }
194     }
195    
196     substr $wbuf, 0, $len, "";
197    
198     unless (length $wbuf) {
199     undef $ww;
200     $shutdown and shutdown $fh, 1;
201     }
202     };
203    
204     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
205    
206     $self->require ($module)
207     ->send_arg ($function, $arg{init}, $serialiser)
208     ->run ("$module\::run", sub {
209     $fh = shift;
210    
211     my ($id, $len);
212     $rw = AE::io $fh, 0, sub {
213     $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
214     $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
215    
216     if ($len) {
217     while (8 <= length $rbuf) {
218     ($id, $len) = unpack "LL", $rbuf;
219     8 + $len <= length $rbuf
220     or last;
221    
222     my @r = $t->(substr $rbuf, 8, $len);
223     substr $rbuf, 0, 8 + $len, "";
224    
225     if ($id) {
226     if (@rcb) {
227     (shift @rcb)->(@r);
228     } elsif (my $cb = delete $rcb{$id}) {
229     $cb->(@r);
230     } else {
231     undef $rw; undef $ww;
232     $on_error->("unexpected data from child");
233     }
234     } else {
235     $on_event->(@r);
236     }
237     }
238     } elsif (defined $len) {
239     undef $rw; undef $ww; # it ends here
240    
241     if (@rcb || %rcb) {
242     $on_error->("unexpected eof");
243     } else {
244     $on_destroy->();
245     }
246     } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
247     undef $rw; undef $ww; # it ends here
248     $on_error->("read: $!");
249     }
250     };
251    
252     $ww ||= AE::io $fh, 1, $wcb;
253     });
254    
255     my $guard = Guard::guard {
256     $shutdown = 1;
257     $ww ||= $fh && AE::io $fh, 1, $wcb;
258     };
259    
260     my $id;
261    
262     $arg{async}
263     ? sub {
264     $id = ($id == 0xffffffff ? 0 : $id) + 1;
265     $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
266    
267     $rcb{$id} = pop;
268    
269     $guard; # keep it alive
270    
271     $wbuf .= pack "LL/a*", $id, &$f;
272     $ww ||= $fh && AE::io $fh, 1, $wcb;
273     }
274     : sub {
275     push @rcb, pop;
276    
277     $guard; # keep it alive
278    
279     $wbuf .= pack "L/a*", &$f;
280     $ww ||= $fh && AE::io $fh, 1, $wcb;
281     }
282     }
283    
284     =item $pool->call (..., $cb->(...))
285    
286     =back
287    
288     =head1 SEE ALSO
289    
290     L<AnyEvent::Fork>, to create the processes in the first place.
291    
292     L<AnyEvent::Fork::RPC>, which implements the RPC protocol and API.
293    
294     =head1 AUTHOR AND CONTACT INFORMATION
295    
296     Marc Lehmann <schmorp@schmorp.de>
297     http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
298    
299     =cut
300    
301     1
302