ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.4
Committed: Wed Apr 17 19:38:25 2013 UTC (11 years, 2 months ago) by root
Branch: MAIN
Changes since 1.3: +152 -11 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork;
8     use AnyEvent::Fork::RPC;
9    
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17     my $cv = AE::cv;
18    
19     $rpc->(1, 2, 3, sub {
20     print "MyModule::server returned @_\n";
21     $cv->send;
22     });
23    
24     $cv->recv;
25    
26     =head1 DESCRIPTION
27    
28     This module implements a simple RPC protocol and backend for processes
29     created via L<AnyEvent::Fork>, allowing you to call a function in the
30     child process and receive its return values (up to 4GB serialised).
31    
32     It implements two different backends: a synchronous one that works like a
33     normal function call, and an asynchronous one that can run multiple jobs
34     concurrently in the child, using AnyEvent.
35    
36     It also implements an asynchronous event mechanism from the child to the
37     parent, that could be used for progress indications or other information.
38    
39 root 1.4 =head1 EXAMPLES
40    
41     =head2 Synchronous Backend
42    
43     Here is a simple example that implements a backend that executes C<unlink>
44     and C<rmdir> calls, and reports their status back. It also reports the
45     number of requests it has processed every three requests, which is clearly
46     silly, but illustrates the use of events.
47    
48     First the parent process:
49    
50     use AnyEvent;
51     use AnyEvent::Fork;
52     use AnyEvent::Fork::RPC;
53    
54     my $done = AE::cv;
55    
56     my $rpc = AnyEvent::Fork
57     ->new
58     ->require ("MyWorker")
59     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
60     on_event => sub { warn "$_[0] requests handled\n" },
61     on_destroy => $done,
62     );
63    
64     for my $id (1..6) {
65     $rpc->(rmdir => "/tmp/somepath/$id", sub {
66     $_[0]
67     or warn "/tmp/somepath/$id: $_[1]\n";
68     });
69     }
70    
71     undef $rpc;
72    
73     $done->recv;
74    
75     The parent creates the process, queues a few rmdir's. It then forgets
76     about the C<$rpc> object, so that the child exits after it has handled the
77     requests, and then it waits till the requests have been handled.
78    
79     The child is implemented using a separate module, C<MyWorker>, shown here:
80    
81     package MyWorker;
82    
83     my $count;
84    
85     sub run {
86     my ($cmd, $path) = @_;
87    
88     AnyEvent::Fork::RPC::event ($count)
89     unless ++$count % 3;
90    
91     my $status = $cmd eq "rmdir" ? rmdir $path
92     : $cmd eq "unlink" ? unlink $path
93     : die "fatal error, illegal command '$cmd'";
94    
95     $status or (0, "$!")
96     }
97    
98     1
99    
100     The C<run> function first sends a "progress" event every three calls, and
101     then executes C<rmdir> or C<unlink>, depending on the first parameter (or
102     dies with a fatal error - obviously, you must never let this happen :).
103    
104     Eventually it returns the status value true if the command was successful,
105     or the status value 0 and the stringified error message.
106    
107     On my system, running the first cdoe fragment with the given
108     F<MyWorker.pm> in the current directory yields:
109    
110     /tmp/somepath/1: No such file or directory
111     /tmp/somepath/2: No such file or directory
112     3 requests handled
113     /tmp/somepath/3: No such file or directory
114     /tmp/somepath/4: No such file or directory
115     /tmp/somepath/5: No such file or directory
116     6 requests handled
117     /tmp/somepath/6: No such file or directory
118    
119     Obviously, none of the directories I am trying to delete even exist. Also,
120     the events and responses are processed in exactly the same order as
121     they were created in the child, which is true for both synchronous and
122     asynchronous backends.
123    
124     Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
125     not optional. That is because the function isn't defined when the code is
126     compiled. You can make sure it is visible by pre-loading the correct
127     backend module in the call to C<require>:
128    
129     ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
130    
131     Since the backend module declares the C<event> function, loading it first
132     ensures that perl will correctly interpret calls to it.
133    
134     And as a final remark, there is a fine module on CPAN that can
135     asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
136     than this example, namely L<IO::AIO>.
137    
138 root 1.1 =head1 PARENT PROCESS USAGE
139    
140     This module exports nothing, and only implements a single function:
141    
142     =over 4
143    
144     =cut
145    
146     package AnyEvent::Fork::RPC;
147    
148     use common::sense;
149    
150     use Errno ();
151     use Guard ();
152    
153     use AnyEvent;
154     #use AnyEvent::Fork;
155    
156     our $VERSION = 0.1;
157    
158     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
159    
160     The traditional way to call it. But it is way cooler to call it in the
161     following way:
162    
163     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
164    
165     This C<run> function/method can be used in place of the
166     L<AnyEvent::Fork::run> method. Just like that method, it takes over
167     the L<AnyEvent::Fork> process, but instead of calling the specified
168     C<$function> directly, it runs a server that accepts RPC calls and handles
169     responses.
170    
171     It returns a function reference that can be used to call the function in
172     the child process, handling serialisation and data transfers.
173    
174     The following key/value pairs are allowed. It is recommended to have at
175     least an C<on_error> or C<on_event> handler set.
176    
177     =over 4
178    
179     =item on_error => $cb->($msg)
180    
181     Called on (fatal) errors, with a descriptive (hopefully) message. If
182     this callback is not provided, but C<on_event> is, then the C<on_event>
183     callback is called with the first argument being the string C<error>,
184     followed by the error message.
185    
186     If neither handler is provided it prints the error to STDERR and will
187     start failing badly.
188    
189     =item on_event => $cb->(...)
190    
191     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
192     child, with the arguments of that function passed to the callback.
193    
194     Also called on errors when no C<on_error> handler is provided.
195    
196 root 1.4 =item on_destroy => $cb->()
197    
198     Called when the C<$rpc> object has been destroyed and all requests have
199     been successfully handled. This is useful when you queue some requests and
200     want the child to go away after it has handled them. The problem is that
201     the parent must not exit either until all requests have been handled, and
202     this cna be accomplished by waiting for this callback.
203    
204 root 1.1 =item init => $function (default none)
205    
206     When specified (by name), this function is called in the child as the very
207     first thing when taking over the process, with all the arguments normally
208     passed to the C<AnyEvent::Fork::run> function, except the communications
209     socket.
210    
211     It can be used to do one-time things in the child such as storing passed
212     parameters or opening database connections.
213    
214 root 1.4 It is called very early - before the serialisers are created or the
215     C<$function> name is resolved into a function reference, so it could be
216     used to load any modules that provide the serialiser or function. It can
217     not, however, create events.
218    
219 root 1.1 =item async => $boolean (default: 0)
220    
221     The default server used in the child does all I/O blockingly, and only
222     allows a single RPC call to execute concurrently.
223    
224     Setting C<async> to a true value switches to another implementation that
225     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
226    
227     The actual API in the child is documented in the section that describes
228     the calling semantics of the returned C<$rpc> function.
229    
230 root 1.2 If you want to pre-load the actual back-end modules to enable memory
231     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
232     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
233    
234 root 1.4 If you use a template process and want to fork both sync and async
235     children, then it is permissible to laod both modules.
236    
237 root 1.1 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
238    
239     All arguments, result data and event data have to be serialised to be
240     transferred between the processes. For this, they have to be frozen and
241     thawed in both parent and child processes.
242    
243     By default, only octet strings can be passed between the processes, which
244     is reasonably fast and efficient.
245    
246     For more complicated use cases, you can provide your own freeze and thaw
247     functions, by specifying a string with perl source code. It's supposed to
248     return two code references when evaluated: the first receives a list of
249     perl values and must return an octet string. The second receives the octet
250     string and must return the original list of values.
251    
252 root 1.2 If you need an external module for serialisation, then you can either
253     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
254     or C<require> statement into the serialiser string. Or both.
255    
256 root 1.1 =back
257    
258     =cut
259    
260 root 1.2 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
261    
262 root 1.1 sub run {
263     my ($self, $function, %arg) = @_;
264    
265 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
266 root 1.1 my $on_event = delete $arg{on_event};
267     my $on_error = delete $arg{on_error};
268 root 1.4 my $on_destroy = delete $arg{on_destroy};
269 root 1.1
270     # default for on_error is to on_event, if specified
271     $on_error ||= $on_event
272     ? sub { $on_event->(error => shift) }
273     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
274    
275     # default for on_event is to raise an error
276     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
277    
278     my ($f, $t) = eval $serialiser; die $@ if $@;
279    
280 root 1.4 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw);
281     my ($rlen, $rbuf) = 512 - 16;
282 root 1.1
283     my $wcb = sub {
284     my $len = syswrite $fh, $wbuf;
285    
286     if (!defined $len) {
287     if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
288     undef $rw; undef $ww; # it ends here
289     $on_error->("$!");
290     }
291     }
292    
293     substr $wbuf, 0, $len, "";
294    
295     unless (length $wbuf) {
296     undef $ww;
297     $shutdown and shutdown $fh, 1;
298     }
299     };
300    
301     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
302    
303     $self->require ($module)
304     ->send_arg ($function, $arg{init}, $serialiser)
305     ->run ("$module\::run", sub {
306     $fh = shift;
307     $rw = AE::io $fh, 0, sub {
308 root 1.4 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
309     my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
310 root 1.1
311     if ($len) {
312     while (5 <= length $rbuf) {
313     $len = unpack "L", $rbuf;
314 root 1.2 4 + $len <= length $rbuf
315     or last;
316    
317     my @r = $t->(substr $rbuf, 4, $len);
318     substr $rbuf, 0, $len + 4, "";
319    
320     if (pop @r) {
321     $on_event->(@r);
322     } elsif (@rcb) {
323     (shift @rcb)->(@r);
324     } else {
325     undef $rw; undef $ww;
326     $on_error->("unexpected data from child");
327 root 1.1 }
328     }
329     } elsif (defined $len) {
330     undef $rw; undef $ww; # it ends here
331 root 1.4
332     if (@rcb) {
333     $on_error->("unexpected eof");
334     } else {
335     $on_destroy->();
336     }
337 root 1.1 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
338     undef $rw; undef $ww; # it ends here
339     $on_error->("read: $!");
340     }
341     };
342    
343     $ww ||= AE::io $fh, 1, $wcb;
344     });
345    
346     my $guard = Guard::guard {
347     $shutdown = 1;
348     $ww ||= $fh && AE::io $fh, 1, $wcb;
349     };
350    
351     sub {
352     push @rcb, pop;
353    
354     $guard; # keep it alive
355    
356     $wbuf .= pack "L/a*", &$f;
357     $ww ||= $fh && AE::io $fh, 1, $wcb;
358     }
359     }
360    
361 root 1.4 =item $rpc->(..., $cb->(...))
362    
363     The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
364     reference. There are two things you can do with it: call it, and let it go
365     out of scope (let it get destroyed).
366    
367     If C<async> was false when C<$rpc> was created (the default), then, if you
368     call C<$rpc>, the C<$function> is invoked with all arguments passed to
369     C<$rpc> except the last one (the callback). When the function returns, the
370     callback will be invoked with all the return values.
371    
372     If C<async> was true, then the C<$function> receives an additional
373     initial argument, the result callback. In this case, returning from
374     C<$function> does nothing - the function only counts as "done" when the
375     result callback is called, and any arguments passed to it are considered
376     the return values. This makes it possible to "return" from event handlers
377     or e.g. Coro threads.
378    
379     The other thing that can be done with the RPC object is to destroy it. In
380     this case, the child process will execute all remaining RPC calls, report
381     their results, and then exit.
382    
383 root 1.1 =back
384    
385     =head1 CHILD PROCESS USAGE
386    
387 root 1.4 The following function is not available in this module. They are only
388     available in the namespace of this module when the child is running,
389     without having to load any extra modules. They are part of the child-side
390     API of L<AnyEvent::Fork::RPC>.
391 root 1.1
392     =over 4
393    
394     =item AnyEvent::Fork::RPC::event ...
395    
396     Send an event to the parent. Events are a bit like RPC calls made by the
397     child process to the parent, except that there is no notion of return
398     values.
399    
400     =back
401    
402     =head1 SEE ALSO
403    
404     L<AnyEvent::Fork> (to create the processes in the first place),
405     L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
406    
407     =head1 AUTHOR AND CONTACT INFORMATION
408    
409     Marc Lehmann <schmorp@schmorp.de>
410     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
411    
412     =cut
413    
414     1
415