ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.10
Committed: Wed Apr 17 22:04:49 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.9: +40 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork::RPC;
8 root 1.7 # use AnyEvent::Fork is not needed
9 root 1.1
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17     my $cv = AE::cv;
18    
19     $rpc->(1, 2, 3, sub {
20     print "MyModule::server returned @_\n";
21     $cv->send;
22     });
23    
24     $cv->recv;
25    
26     =head1 DESCRIPTION
27    
28     This module implements a simple RPC protocol and backend for processes
29     created via L<AnyEvent::Fork>, allowing you to call a function in the
30     child process and receive its return values (up to 4GB serialised).
31    
32     It implements two different backends: a synchronous one that works like a
33     normal function call, and an asynchronous one that can run multiple jobs
34     concurrently in the child, using AnyEvent.
35    
36     It also implements an asynchronous event mechanism from the child to the
37     parent, that could be used for progress indications or other information.
38    
39 root 1.7 Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40     separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41    
42 root 1.4 =head1 EXAMPLES
43    
44 root 1.10 =head2 Example 1: Synchronous Backend
45 root 1.4
46     Here is a simple example that implements a backend that executes C<unlink>
47     and C<rmdir> calls, and reports their status back. It also reports the
48     number of requests it has processed every three requests, which is clearly
49     silly, but illustrates the use of events.
50    
51     First the parent process:
52    
53     use AnyEvent;
54     use AnyEvent::Fork;
55     use AnyEvent::Fork::RPC;
56    
57     my $done = AE::cv;
58    
59     my $rpc = AnyEvent::Fork
60     ->new
61     ->require ("MyWorker")
62     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 root 1.5 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
64 root 1.4 on_event => sub { warn "$_[0] requests handled\n" },
65     on_destroy => $done,
66     );
67    
68     for my $id (1..6) {
69     $rpc->(rmdir => "/tmp/somepath/$id", sub {
70     $_[0]
71     or warn "/tmp/somepath/$id: $_[1]\n";
72     });
73     }
74    
75     undef $rpc;
76    
77     $done->recv;
78    
79     The parent creates the process, queues a few rmdir's. It then forgets
80     about the C<$rpc> object, so that the child exits after it has handled the
81     requests, and then it waits till the requests have been handled.
82    
83     The child is implemented using a separate module, C<MyWorker>, shown here:
84    
85     package MyWorker;
86    
87     my $count;
88    
89     sub run {
90     my ($cmd, $path) = @_;
91    
92     AnyEvent::Fork::RPC::event ($count)
93     unless ++$count % 3;
94    
95     my $status = $cmd eq "rmdir" ? rmdir $path
96     : $cmd eq "unlink" ? unlink $path
97     : die "fatal error, illegal command '$cmd'";
98    
99     $status or (0, "$!")
100     }
101    
102     1
103    
104     The C<run> function first sends a "progress" event every three calls, and
105     then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106     dies with a fatal error - obviously, you must never let this happen :).
107    
108     Eventually it returns the status value true if the command was successful,
109     or the status value 0 and the stringified error message.
110    
111 root 1.6 On my system, running the first code fragment with the given
112 root 1.4 F<MyWorker.pm> in the current directory yields:
113    
114     /tmp/somepath/1: No such file or directory
115     /tmp/somepath/2: No such file or directory
116     3 requests handled
117     /tmp/somepath/3: No such file or directory
118     /tmp/somepath/4: No such file or directory
119     /tmp/somepath/5: No such file or directory
120     6 requests handled
121     /tmp/somepath/6: No such file or directory
122    
123     Obviously, none of the directories I am trying to delete even exist. Also,
124     the events and responses are processed in exactly the same order as
125     they were created in the child, which is true for both synchronous and
126     asynchronous backends.
127    
128     Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129     not optional. That is because the function isn't defined when the code is
130     compiled. You can make sure it is visible by pre-loading the correct
131     backend module in the call to C<require>:
132    
133     ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134    
135     Since the backend module declares the C<event> function, loading it first
136     ensures that perl will correctly interpret calls to it.
137    
138     And as a final remark, there is a fine module on CPAN that can
139     asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140     than this example, namely L<IO::AIO>.
141    
142 root 1.10 =head3 Example 1a: the same with the asynchronous backend
143    
144     This example only shows what needs to be changed to use the async backend
145     instead. Doing this is not very useful, the purpose of this example is
146     to show the minimum amount of change that is required to go from the
147     synchronous to the asynchronous backend.
148    
149     To use the async backend in the previous example, you need to add the
150     C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
151    
152     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153     async => 1,
154     ...
155    
156     And since the function call protocol is now changed, you need to adopt
157     C<MyWorker::run> to the async API.
158    
159     First, you need to accept the extra initial C<$done> callback:
160    
161     sub run {
162     my ($done, $cmd, $path) = @_;
163    
164     And since a response is now generated when C<$done> is called, as opposed
165     to when the function returns, we need to call the C<$done> function with
166     the status:
167    
168     $done->($status or (0, "$!"));
169    
170     A few remarks are in order. First, it's quite pointless to use the async
171     backend for this example - but it I<is> possible. Second, you can call
172     C<$done> before or after returning from the function. Third, having both
173     returned from the function and having called the C<$done> callback, the
174     child process may exit at any time, so you should call C<$done> only when
175     you really I<are> done.
176    
177     =head2 Example 2: Asynchronous Backend
178    
179     #TODO
180    
181 root 1.1 =head1 PARENT PROCESS USAGE
182    
183     This module exports nothing, and only implements a single function:
184    
185     =over 4
186    
187     =cut
188    
189     package AnyEvent::Fork::RPC;
190    
191     use common::sense;
192    
193     use Errno ();
194     use Guard ();
195    
196     use AnyEvent;
197 root 1.7 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
198 root 1.1
199     our $VERSION = 0.1;
200    
201     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
202    
203     The traditional way to call it. But it is way cooler to call it in the
204     following way:
205    
206     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
207    
208     This C<run> function/method can be used in place of the
209     L<AnyEvent::Fork::run> method. Just like that method, it takes over
210     the L<AnyEvent::Fork> process, but instead of calling the specified
211     C<$function> directly, it runs a server that accepts RPC calls and handles
212     responses.
213    
214     It returns a function reference that can be used to call the function in
215     the child process, handling serialisation and data transfers.
216    
217     The following key/value pairs are allowed. It is recommended to have at
218     least an C<on_error> or C<on_event> handler set.
219    
220     =over 4
221    
222     =item on_error => $cb->($msg)
223    
224     Called on (fatal) errors, with a descriptive (hopefully) message. If
225     this callback is not provided, but C<on_event> is, then the C<on_event>
226     callback is called with the first argument being the string C<error>,
227     followed by the error message.
228    
229     If neither handler is provided it prints the error to STDERR and will
230     start failing badly.
231    
232     =item on_event => $cb->(...)
233    
234     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
235     child, with the arguments of that function passed to the callback.
236    
237     Also called on errors when no C<on_error> handler is provided.
238    
239 root 1.4 =item on_destroy => $cb->()
240    
241     Called when the C<$rpc> object has been destroyed and all requests have
242     been successfully handled. This is useful when you queue some requests and
243     want the child to go away after it has handled them. The problem is that
244     the parent must not exit either until all requests have been handled, and
245 root 1.6 this can be accomplished by waiting for this callback.
246 root 1.4
247 root 1.1 =item init => $function (default none)
248    
249     When specified (by name), this function is called in the child as the very
250     first thing when taking over the process, with all the arguments normally
251     passed to the C<AnyEvent::Fork::run> function, except the communications
252     socket.
253    
254     It can be used to do one-time things in the child such as storing passed
255     parameters or opening database connections.
256    
257 root 1.4 It is called very early - before the serialisers are created or the
258     C<$function> name is resolved into a function reference, so it could be
259     used to load any modules that provide the serialiser or function. It can
260     not, however, create events.
261    
262 root 1.1 =item async => $boolean (default: 0)
263    
264     The default server used in the child does all I/O blockingly, and only
265     allows a single RPC call to execute concurrently.
266    
267     Setting C<async> to a true value switches to another implementation that
268     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
269    
270     The actual API in the child is documented in the section that describes
271     the calling semantics of the returned C<$rpc> function.
272    
273 root 1.2 If you want to pre-load the actual back-end modules to enable memory
274     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
275     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
276    
277 root 1.4 If you use a template process and want to fork both sync and async
278 root 1.6 children, then it is permissible to load both modules.
279 root 1.4
280 root 1.1 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
281    
282     All arguments, result data and event data have to be serialised to be
283     transferred between the processes. For this, they have to be frozen and
284     thawed in both parent and child processes.
285    
286     By default, only octet strings can be passed between the processes, which
287     is reasonably fast and efficient.
288    
289     For more complicated use cases, you can provide your own freeze and thaw
290     functions, by specifying a string with perl source code. It's supposed to
291     return two code references when evaluated: the first receives a list of
292     perl values and must return an octet string. The second receives the octet
293     string and must return the original list of values.
294    
295 root 1.2 If you need an external module for serialisation, then you can either
296     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
297     or C<require> statement into the serialiser string. Or both.
298    
299 root 1.1 =back
300    
301 root 1.9 See the examples section earlier in this document for some actual
302     examples.
303 root 1.8
304 root 1.1 =cut
305    
306 root 1.2 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
307    
308 root 1.1 sub run {
309     my ($self, $function, %arg) = @_;
310    
311 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
312 root 1.1 my $on_event = delete $arg{on_event};
313     my $on_error = delete $arg{on_error};
314 root 1.4 my $on_destroy = delete $arg{on_destroy};
315 root 1.1
316     # default for on_error is to on_event, if specified
317     $on_error ||= $on_event
318     ? sub { $on_event->(error => shift) }
319     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
320    
321     # default for on_event is to raise an error
322     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
323    
324     my ($f, $t) = eval $serialiser; die $@ if $@;
325    
326 root 1.9 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
327     my ($rlen, $rbuf, $rw) = 512 - 16;
328 root 1.1
329     my $wcb = sub {
330     my $len = syswrite $fh, $wbuf;
331    
332 root 1.9 unless (defined $len) {
333 root 1.1 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
334     undef $rw; undef $ww; # it ends here
335     $on_error->("$!");
336     }
337     }
338    
339     substr $wbuf, 0, $len, "";
340    
341     unless (length $wbuf) {
342     undef $ww;
343     $shutdown and shutdown $fh, 1;
344     }
345     };
346    
347     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
348    
349     $self->require ($module)
350     ->send_arg ($function, $arg{init}, $serialiser)
351     ->run ("$module\::run", sub {
352     $fh = shift;
353 root 1.9
354     my ($id, $len);
355 root 1.1 $rw = AE::io $fh, 0, sub {
356 root 1.4 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
357 root 1.9 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
358 root 1.1
359     if ($len) {
360 root 1.9 while (8 <= length $rbuf) {
361     ($id, $len) = unpack "LL", $rbuf;
362     8 + $len <= length $rbuf
363 root 1.2 or last;
364    
365 root 1.9 my @r = $t->(substr $rbuf, 8, $len);
366     substr $rbuf, 0, 8 + $len, "";
367    
368     if ($id) {
369     if (@rcb) {
370     (shift @rcb)->(@r);
371     } elsif (my $cb = delete $rcb{$id}) {
372     $cb->(@r);
373     } else {
374     undef $rw; undef $ww;
375     $on_error->("unexpected data from child");
376     }
377     } else {
378 root 1.2 $on_event->(@r);
379 root 1.1 }
380     }
381     } elsif (defined $len) {
382     undef $rw; undef $ww; # it ends here
383 root 1.4
384 root 1.9 if (@rcb || %rcb) {
385     use Data::Dump;ddx[\@rcb,\%rcb];#d#
386 root 1.4 $on_error->("unexpected eof");
387     } else {
388     $on_destroy->();
389     }
390 root 1.1 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
391     undef $rw; undef $ww; # it ends here
392     $on_error->("read: $!");
393     }
394     };
395    
396     $ww ||= AE::io $fh, 1, $wcb;
397     });
398    
399     my $guard = Guard::guard {
400     $shutdown = 1;
401     $ww ||= $fh && AE::io $fh, 1, $wcb;
402     };
403    
404 root 1.9 my $id;
405 root 1.1
406 root 1.9 $arg{async}
407     ? sub {
408     $id = ($id == 0xffffffff ? 0 : $id) + 1;
409     $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
410 root 1.1
411 root 1.9 $rcb{$id} = pop;
412    
413     $guard; # keep it alive
414    
415     $wbuf .= pack "LL/a*", $id, &$f;
416     $ww ||= $fh && AE::io $fh, 1, $wcb;
417     }
418     : sub {
419     push @rcb, pop;
420    
421     $guard; # keep it alive
422    
423     $wbuf .= pack "L/a*", &$f;
424     $ww ||= $fh && AE::io $fh, 1, $wcb;
425     }
426 root 1.1 }
427    
428 root 1.4 =item $rpc->(..., $cb->(...))
429    
430     The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
431     reference. There are two things you can do with it: call it, and let it go
432     out of scope (let it get destroyed).
433    
434     If C<async> was false when C<$rpc> was created (the default), then, if you
435     call C<$rpc>, the C<$function> is invoked with all arguments passed to
436     C<$rpc> except the last one (the callback). When the function returns, the
437     callback will be invoked with all the return values.
438    
439     If C<async> was true, then the C<$function> receives an additional
440     initial argument, the result callback. In this case, returning from
441     C<$function> does nothing - the function only counts as "done" when the
442     result callback is called, and any arguments passed to it are considered
443     the return values. This makes it possible to "return" from event handlers
444     or e.g. Coro threads.
445    
446     The other thing that can be done with the RPC object is to destroy it. In
447     this case, the child process will execute all remaining RPC calls, report
448     their results, and then exit.
449    
450 root 1.8 See the examples section earlier in this document for some actual
451     examples.
452    
453 root 1.1 =back
454    
455     =head1 CHILD PROCESS USAGE
456    
457 root 1.4 The following function is not available in this module. They are only
458     available in the namespace of this module when the child is running,
459     without having to load any extra modules. They are part of the child-side
460     API of L<AnyEvent::Fork::RPC>.
461 root 1.1
462     =over 4
463    
464     =item AnyEvent::Fork::RPC::event ...
465    
466     Send an event to the parent. Events are a bit like RPC calls made by the
467     child process to the parent, except that there is no notion of return
468     values.
469    
470 root 1.8 See the examples section earlier in this document for some actual
471     examples.
472    
473 root 1.1 =back
474    
475     =head1 SEE ALSO
476    
477     L<AnyEvent::Fork> (to create the processes in the first place),
478     L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
479    
480     =head1 AUTHOR AND CONTACT INFORMATION
481    
482     Marc Lehmann <schmorp@schmorp.de>
483     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
484    
485     =cut
486    
487     1
488