ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.5
Committed: Wed Apr 17 19:39:54 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.4: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork;
8     use AnyEvent::Fork::RPC;
9    
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17     my $cv = AE::cv;
18    
19     $rpc->(1, 2, 3, sub {
20     print "MyModule::server returned @_\n";
21     $cv->send;
22     });
23    
24     $cv->recv;
25    
26     =head1 DESCRIPTION
27    
28     This module implements a simple RPC protocol and backend for processes
29     created via L<AnyEvent::Fork>, allowing you to call a function in the
30     child process and receive its return values (up to 4GB serialised).
31    
32     It implements two different backends: a synchronous one that works like a
33     normal function call, and an asynchronous one that can run multiple jobs
34     concurrently in the child, using AnyEvent.
35    
36     It also implements an asynchronous event mechanism from the child to the
37     parent, that could be used for progress indications or other information.
38    
39 root 1.4 =head1 EXAMPLES
40    
41     =head2 Synchronous Backend
42    
43     Here is a simple example that implements a backend that executes C<unlink>
44     and C<rmdir> calls, and reports their status back. It also reports the
45     number of requests it has processed every three requests, which is clearly
46     silly, but illustrates the use of events.
47    
48     First the parent process:
49    
50     use AnyEvent;
51     use AnyEvent::Fork;
52     use AnyEvent::Fork::RPC;
53    
54     my $done = AE::cv;
55    
56     my $rpc = AnyEvent::Fork
57     ->new
58     ->require ("MyWorker")
59     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
60 root 1.5 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
61 root 1.4 on_event => sub { warn "$_[0] requests handled\n" },
62     on_destroy => $done,
63     );
64    
65     for my $id (1..6) {
66     $rpc->(rmdir => "/tmp/somepath/$id", sub {
67     $_[0]
68     or warn "/tmp/somepath/$id: $_[1]\n";
69     });
70     }
71    
72     undef $rpc;
73    
74     $done->recv;
75    
76     The parent creates the process, queues a few rmdir's. It then forgets
77     about the C<$rpc> object, so that the child exits after it has handled the
78     requests, and then it waits till the requests have been handled.
79    
80     The child is implemented using a separate module, C<MyWorker>, shown here:
81    
82     package MyWorker;
83    
84     my $count;
85    
86     sub run {
87     my ($cmd, $path) = @_;
88    
89     AnyEvent::Fork::RPC::event ($count)
90     unless ++$count % 3;
91    
92     my $status = $cmd eq "rmdir" ? rmdir $path
93     : $cmd eq "unlink" ? unlink $path
94     : die "fatal error, illegal command '$cmd'";
95    
96     $status or (0, "$!")
97     }
98    
99     1
100    
101     The C<run> function first sends a "progress" event every three calls, and
102     then executes C<rmdir> or C<unlink>, depending on the first parameter (or
103     dies with a fatal error - obviously, you must never let this happen :).
104    
105     Eventually it returns the status value true if the command was successful,
106     or the status value 0 and the stringified error message.
107    
108     On my system, running the first cdoe fragment with the given
109     F<MyWorker.pm> in the current directory yields:
110    
111     /tmp/somepath/1: No such file or directory
112     /tmp/somepath/2: No such file or directory
113     3 requests handled
114     /tmp/somepath/3: No such file or directory
115     /tmp/somepath/4: No such file or directory
116     /tmp/somepath/5: No such file or directory
117     6 requests handled
118     /tmp/somepath/6: No such file or directory
119    
120     Obviously, none of the directories I am trying to delete even exist. Also,
121     the events and responses are processed in exactly the same order as
122     they were created in the child, which is true for both synchronous and
123     asynchronous backends.
124    
125     Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
126     not optional. That is because the function isn't defined when the code is
127     compiled. You can make sure it is visible by pre-loading the correct
128     backend module in the call to C<require>:
129    
130     ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
131    
132     Since the backend module declares the C<event> function, loading it first
133     ensures that perl will correctly interpret calls to it.
134    
135     And as a final remark, there is a fine module on CPAN that can
136     asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
137     than this example, namely L<IO::AIO>.
138    
139 root 1.1 =head1 PARENT PROCESS USAGE
140    
141     This module exports nothing, and only implements a single function:
142    
143     =over 4
144    
145     =cut
146    
147     package AnyEvent::Fork::RPC;
148    
149     use common::sense;
150    
151     use Errno ();
152     use Guard ();
153    
154     use AnyEvent;
155     #use AnyEvent::Fork;
156    
157     our $VERSION = 0.1;
158    
159     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
160    
161     The traditional way to call it. But it is way cooler to call it in the
162     following way:
163    
164     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
165    
166     This C<run> function/method can be used in place of the
167     L<AnyEvent::Fork::run> method. Just like that method, it takes over
168     the L<AnyEvent::Fork> process, but instead of calling the specified
169     C<$function> directly, it runs a server that accepts RPC calls and handles
170     responses.
171    
172     It returns a function reference that can be used to call the function in
173     the child process, handling serialisation and data transfers.
174    
175     The following key/value pairs are allowed. It is recommended to have at
176     least an C<on_error> or C<on_event> handler set.
177    
178     =over 4
179    
180     =item on_error => $cb->($msg)
181    
182     Called on (fatal) errors, with a descriptive (hopefully) message. If
183     this callback is not provided, but C<on_event> is, then the C<on_event>
184     callback is called with the first argument being the string C<error>,
185     followed by the error message.
186    
187     If neither handler is provided it prints the error to STDERR and will
188     start failing badly.
189    
190     =item on_event => $cb->(...)
191    
192     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
193     child, with the arguments of that function passed to the callback.
194    
195     Also called on errors when no C<on_error> handler is provided.
196    
197 root 1.4 =item on_destroy => $cb->()
198    
199     Called when the C<$rpc> object has been destroyed and all requests have
200     been successfully handled. This is useful when you queue some requests and
201     want the child to go away after it has handled them. The problem is that
202     the parent must not exit either until all requests have been handled, and
203     this cna be accomplished by waiting for this callback.
204    
205 root 1.1 =item init => $function (default none)
206    
207     When specified (by name), this function is called in the child as the very
208     first thing when taking over the process, with all the arguments normally
209     passed to the C<AnyEvent::Fork::run> function, except the communications
210     socket.
211    
212     It can be used to do one-time things in the child such as storing passed
213     parameters or opening database connections.
214    
215 root 1.4 It is called very early - before the serialisers are created or the
216     C<$function> name is resolved into a function reference, so it could be
217     used to load any modules that provide the serialiser or function. It can
218     not, however, create events.
219    
220 root 1.1 =item async => $boolean (default: 0)
221    
222     The default server used in the child does all I/O blockingly, and only
223     allows a single RPC call to execute concurrently.
224    
225     Setting C<async> to a true value switches to another implementation that
226     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
227    
228     The actual API in the child is documented in the section that describes
229     the calling semantics of the returned C<$rpc> function.
230    
231 root 1.2 If you want to pre-load the actual back-end modules to enable memory
232     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
233     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
234    
235 root 1.4 If you use a template process and want to fork both sync and async
236     children, then it is permissible to laod both modules.
237    
238 root 1.1 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
239    
240     All arguments, result data and event data have to be serialised to be
241     transferred between the processes. For this, they have to be frozen and
242     thawed in both parent and child processes.
243    
244     By default, only octet strings can be passed between the processes, which
245     is reasonably fast and efficient.
246    
247     For more complicated use cases, you can provide your own freeze and thaw
248     functions, by specifying a string with perl source code. It's supposed to
249     return two code references when evaluated: the first receives a list of
250     perl values and must return an octet string. The second receives the octet
251     string and must return the original list of values.
252    
253 root 1.2 If you need an external module for serialisation, then you can either
254     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
255     or C<require> statement into the serialiser string. Or both.
256    
257 root 1.1 =back
258    
259     =cut
260    
261 root 1.2 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
262    
263 root 1.1 sub run {
264     my ($self, $function, %arg) = @_;
265    
266 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
267 root 1.1 my $on_event = delete $arg{on_event};
268     my $on_error = delete $arg{on_error};
269 root 1.4 my $on_destroy = delete $arg{on_destroy};
270 root 1.1
271     # default for on_error is to on_event, if specified
272     $on_error ||= $on_event
273     ? sub { $on_event->(error => shift) }
274     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
275    
276     # default for on_event is to raise an error
277     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
278    
279     my ($f, $t) = eval $serialiser; die $@ if $@;
280    
281 root 1.4 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw);
282     my ($rlen, $rbuf) = 512 - 16;
283 root 1.1
284     my $wcb = sub {
285     my $len = syswrite $fh, $wbuf;
286    
287     if (!defined $len) {
288     if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
289     undef $rw; undef $ww; # it ends here
290     $on_error->("$!");
291     }
292     }
293    
294     substr $wbuf, 0, $len, "";
295    
296     unless (length $wbuf) {
297     undef $ww;
298     $shutdown and shutdown $fh, 1;
299     }
300     };
301    
302     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
303    
304     $self->require ($module)
305     ->send_arg ($function, $arg{init}, $serialiser)
306     ->run ("$module\::run", sub {
307     $fh = shift;
308     $rw = AE::io $fh, 0, sub {
309 root 1.4 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
310     my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
311 root 1.1
312     if ($len) {
313     while (5 <= length $rbuf) {
314     $len = unpack "L", $rbuf;
315 root 1.2 4 + $len <= length $rbuf
316     or last;
317    
318     my @r = $t->(substr $rbuf, 4, $len);
319     substr $rbuf, 0, $len + 4, "";
320    
321     if (pop @r) {
322     $on_event->(@r);
323     } elsif (@rcb) {
324     (shift @rcb)->(@r);
325     } else {
326     undef $rw; undef $ww;
327     $on_error->("unexpected data from child");
328 root 1.1 }
329     }
330     } elsif (defined $len) {
331     undef $rw; undef $ww; # it ends here
332 root 1.4
333     if (@rcb) {
334     $on_error->("unexpected eof");
335     } else {
336     $on_destroy->();
337     }
338 root 1.1 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
339     undef $rw; undef $ww; # it ends here
340     $on_error->("read: $!");
341     }
342     };
343    
344     $ww ||= AE::io $fh, 1, $wcb;
345     });
346    
347     my $guard = Guard::guard {
348     $shutdown = 1;
349     $ww ||= $fh && AE::io $fh, 1, $wcb;
350     };
351    
352     sub {
353     push @rcb, pop;
354    
355     $guard; # keep it alive
356    
357     $wbuf .= pack "L/a*", &$f;
358     $ww ||= $fh && AE::io $fh, 1, $wcb;
359     }
360     }
361    
362 root 1.4 =item $rpc->(..., $cb->(...))
363    
364     The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
365     reference. There are two things you can do with it: call it, and let it go
366     out of scope (let it get destroyed).
367    
368     If C<async> was false when C<$rpc> was created (the default), then, if you
369     call C<$rpc>, the C<$function> is invoked with all arguments passed to
370     C<$rpc> except the last one (the callback). When the function returns, the
371     callback will be invoked with all the return values.
372    
373     If C<async> was true, then the C<$function> receives an additional
374     initial argument, the result callback. In this case, returning from
375     C<$function> does nothing - the function only counts as "done" when the
376     result callback is called, and any arguments passed to it are considered
377     the return values. This makes it possible to "return" from event handlers
378     or e.g. Coro threads.
379    
380     The other thing that can be done with the RPC object is to destroy it. In
381     this case, the child process will execute all remaining RPC calls, report
382     their results, and then exit.
383    
384 root 1.1 =back
385    
386     =head1 CHILD PROCESS USAGE
387    
388 root 1.4 The following function is not available in this module. They are only
389     available in the namespace of this module when the child is running,
390     without having to load any extra modules. They are part of the child-side
391     API of L<AnyEvent::Fork::RPC>.
392 root 1.1
393     =over 4
394    
395     =item AnyEvent::Fork::RPC::event ...
396    
397     Send an event to the parent. Events are a bit like RPC calls made by the
398     child process to the parent, except that there is no notion of return
399     values.
400    
401     =back
402    
403     =head1 SEE ALSO
404    
405     L<AnyEvent::Fork> (to create the processes in the first place),
406     L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
407    
408     =head1 AUTHOR AND CONTACT INFORMATION
409    
410     Marc Lehmann <schmorp@schmorp.de>
411     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
412    
413     =cut
414    
415     1
416