ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.9
Committed: Wed Apr 17 21:48:35 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.8: +46 -24 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork::RPC;
8 root 1.7 # use AnyEvent::Fork is not needed
9 root 1.1
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17     my $cv = AE::cv;
18    
19     $rpc->(1, 2, 3, sub {
20     print "MyModule::server returned @_\n";
21     $cv->send;
22     });
23    
24     $cv->recv;
25    
26     =head1 DESCRIPTION
27    
28     This module implements a simple RPC protocol and backend for processes
29     created via L<AnyEvent::Fork>, allowing you to call a function in the
30     child process and receive its return values (up to 4GB serialised).
31    
32     It implements two different backends: a synchronous one that works like a
33     normal function call, and an asynchronous one that can run multiple jobs
34     concurrently in the child, using AnyEvent.
35    
36     It also implements an asynchronous event mechanism from the child to the
37     parent, that could be used for progress indications or other information.
38    
39 root 1.7 Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40     separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41    
42 root 1.4 =head1 EXAMPLES
43    
44     =head2 Synchronous Backend
45    
46     Here is a simple example that implements a backend that executes C<unlink>
47     and C<rmdir> calls, and reports their status back. It also reports the
48     number of requests it has processed every three requests, which is clearly
49     silly, but illustrates the use of events.
50    
51     First the parent process:
52    
53     use AnyEvent;
54     use AnyEvent::Fork;
55     use AnyEvent::Fork::RPC;
56    
57     my $done = AE::cv;
58    
59     my $rpc = AnyEvent::Fork
60     ->new
61     ->require ("MyWorker")
62     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 root 1.5 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
64 root 1.4 on_event => sub { warn "$_[0] requests handled\n" },
65     on_destroy => $done,
66     );
67    
68     for my $id (1..6) {
69     $rpc->(rmdir => "/tmp/somepath/$id", sub {
70     $_[0]
71     or warn "/tmp/somepath/$id: $_[1]\n";
72     });
73     }
74    
75     undef $rpc;
76    
77     $done->recv;
78    
79     The parent creates the process, queues a few rmdir's. It then forgets
80     about the C<$rpc> object, so that the child exits after it has handled the
81     requests, and then it waits till the requests have been handled.
82    
83     The child is implemented using a separate module, C<MyWorker>, shown here:
84    
85     package MyWorker;
86    
87     my $count;
88    
89     sub run {
90     my ($cmd, $path) = @_;
91    
92     AnyEvent::Fork::RPC::event ($count)
93     unless ++$count % 3;
94    
95     my $status = $cmd eq "rmdir" ? rmdir $path
96     : $cmd eq "unlink" ? unlink $path
97     : die "fatal error, illegal command '$cmd'";
98    
99     $status or (0, "$!")
100     }
101    
102     1
103    
104     The C<run> function first sends a "progress" event every three calls, and
105     then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106     dies with a fatal error - obviously, you must never let this happen :).
107    
108     Eventually it returns the status value true if the command was successful,
109     or the status value 0 and the stringified error message.
110    
111 root 1.6 On my system, running the first code fragment with the given
112 root 1.4 F<MyWorker.pm> in the current directory yields:
113    
114     /tmp/somepath/1: No such file or directory
115     /tmp/somepath/2: No such file or directory
116     3 requests handled
117     /tmp/somepath/3: No such file or directory
118     /tmp/somepath/4: No such file or directory
119     /tmp/somepath/5: No such file or directory
120     6 requests handled
121     /tmp/somepath/6: No such file or directory
122    
123     Obviously, none of the directories I am trying to delete even exist. Also,
124     the events and responses are processed in exactly the same order as
125     they were created in the child, which is true for both synchronous and
126     asynchronous backends.
127    
128     Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129     not optional. That is because the function isn't defined when the code is
130     compiled. You can make sure it is visible by pre-loading the correct
131     backend module in the call to C<require>:
132    
133     ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134    
135     Since the backend module declares the C<event> function, loading it first
136     ensures that perl will correctly interpret calls to it.
137    
138     And as a final remark, there is a fine module on CPAN that can
139     asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140     than this example, namely L<IO::AIO>.
141    
142 root 1.1 =head1 PARENT PROCESS USAGE
143    
144     This module exports nothing, and only implements a single function:
145    
146     =over 4
147    
148     =cut
149    
150     package AnyEvent::Fork::RPC;
151    
152     use common::sense;
153    
154     use Errno ();
155     use Guard ();
156    
157     use AnyEvent;
158 root 1.7 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
159 root 1.1
160     our $VERSION = 0.1;
161    
162     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
163    
164     The traditional way to call it. But it is way cooler to call it in the
165     following way:
166    
167     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
168    
169     This C<run> function/method can be used in place of the
170     L<AnyEvent::Fork::run> method. Just like that method, it takes over
171     the L<AnyEvent::Fork> process, but instead of calling the specified
172     C<$function> directly, it runs a server that accepts RPC calls and handles
173     responses.
174    
175     It returns a function reference that can be used to call the function in
176     the child process, handling serialisation and data transfers.
177    
178     The following key/value pairs are allowed. It is recommended to have at
179     least an C<on_error> or C<on_event> handler set.
180    
181     =over 4
182    
183     =item on_error => $cb->($msg)
184    
185     Called on (fatal) errors, with a descriptive (hopefully) message. If
186     this callback is not provided, but C<on_event> is, then the C<on_event>
187     callback is called with the first argument being the string C<error>,
188     followed by the error message.
189    
190     If neither handler is provided it prints the error to STDERR and will
191     start failing badly.
192    
193     =item on_event => $cb->(...)
194    
195     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
196     child, with the arguments of that function passed to the callback.
197    
198     Also called on errors when no C<on_error> handler is provided.
199    
200 root 1.4 =item on_destroy => $cb->()
201    
202     Called when the C<$rpc> object has been destroyed and all requests have
203     been successfully handled. This is useful when you queue some requests and
204     want the child to go away after it has handled them. The problem is that
205     the parent must not exit either until all requests have been handled, and
206 root 1.6 this can be accomplished by waiting for this callback.
207 root 1.4
208 root 1.1 =item init => $function (default none)
209    
210     When specified (by name), this function is called in the child as the very
211     first thing when taking over the process, with all the arguments normally
212     passed to the C<AnyEvent::Fork::run> function, except the communications
213     socket.
214    
215     It can be used to do one-time things in the child such as storing passed
216     parameters or opening database connections.
217    
218 root 1.4 It is called very early - before the serialisers are created or the
219     C<$function> name is resolved into a function reference, so it could be
220     used to load any modules that provide the serialiser or function. It can
221     not, however, create events.
222    
223 root 1.1 =item async => $boolean (default: 0)
224    
225     The default server used in the child does all I/O blockingly, and only
226     allows a single RPC call to execute concurrently.
227    
228     Setting C<async> to a true value switches to another implementation that
229     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
230    
231     The actual API in the child is documented in the section that describes
232     the calling semantics of the returned C<$rpc> function.
233    
234 root 1.2 If you want to pre-load the actual back-end modules to enable memory
235     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
236     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
237    
238 root 1.4 If you use a template process and want to fork both sync and async
239 root 1.6 children, then it is permissible to load both modules.
240 root 1.4
241 root 1.1 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
242    
243     All arguments, result data and event data have to be serialised to be
244     transferred between the processes. For this, they have to be frozen and
245     thawed in both parent and child processes.
246    
247     By default, only octet strings can be passed between the processes, which
248     is reasonably fast and efficient.
249    
250     For more complicated use cases, you can provide your own freeze and thaw
251     functions, by specifying a string with perl source code. It's supposed to
252     return two code references when evaluated: the first receives a list of
253     perl values and must return an octet string. The second receives the octet
254     string and must return the original list of values.
255    
256 root 1.2 If you need an external module for serialisation, then you can either
257     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
258     or C<require> statement into the serialiser string. Or both.
259    
260 root 1.1 =back
261    
262 root 1.9 See the examples section earlier in this document for some actual
263     examples.
264 root 1.8
265 root 1.1 =cut
266    
267 root 1.2 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
268    
269 root 1.1 sub run {
270     my ($self, $function, %arg) = @_;
271    
272 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
273 root 1.1 my $on_event = delete $arg{on_event};
274     my $on_error = delete $arg{on_error};
275 root 1.4 my $on_destroy = delete $arg{on_destroy};
276 root 1.1
277     # default for on_error is to on_event, if specified
278     $on_error ||= $on_event
279     ? sub { $on_event->(error => shift) }
280     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
281    
282     # default for on_event is to raise an error
283     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
284    
285     my ($f, $t) = eval $serialiser; die $@ if $@;
286    
287 root 1.9 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
288     my ($rlen, $rbuf, $rw) = 512 - 16;
289 root 1.1
290     my $wcb = sub {
291     my $len = syswrite $fh, $wbuf;
292    
293 root 1.9 unless (defined $len) {
294 root 1.1 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
295     undef $rw; undef $ww; # it ends here
296     $on_error->("$!");
297     }
298     }
299    
300     substr $wbuf, 0, $len, "";
301    
302     unless (length $wbuf) {
303     undef $ww;
304     $shutdown and shutdown $fh, 1;
305     }
306     };
307    
308     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
309    
310     $self->require ($module)
311     ->send_arg ($function, $arg{init}, $serialiser)
312     ->run ("$module\::run", sub {
313     $fh = shift;
314 root 1.9
315     my ($id, $len);
316 root 1.1 $rw = AE::io $fh, 0, sub {
317 root 1.4 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
318 root 1.9 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
319 root 1.1
320     if ($len) {
321 root 1.9 while (8 <= length $rbuf) {
322     ($id, $len) = unpack "LL", $rbuf;
323     8 + $len <= length $rbuf
324 root 1.2 or last;
325    
326 root 1.9 my @r = $t->(substr $rbuf, 8, $len);
327     substr $rbuf, 0, 8 + $len, "";
328    
329     if ($id) {
330     if (@rcb) {
331     (shift @rcb)->(@r);
332     } elsif (my $cb = delete $rcb{$id}) {
333     $cb->(@r);
334     } else {
335     undef $rw; undef $ww;
336     $on_error->("unexpected data from child");
337     }
338     } else {
339 root 1.2 $on_event->(@r);
340 root 1.1 }
341     }
342     } elsif (defined $len) {
343     undef $rw; undef $ww; # it ends here
344 root 1.4
345 root 1.9 if (@rcb || %rcb) {
346     use Data::Dump;ddx[\@rcb,\%rcb];#d#
347 root 1.4 $on_error->("unexpected eof");
348     } else {
349     $on_destroy->();
350     }
351 root 1.1 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
352     undef $rw; undef $ww; # it ends here
353     $on_error->("read: $!");
354     }
355     };
356    
357     $ww ||= AE::io $fh, 1, $wcb;
358     });
359    
360     my $guard = Guard::guard {
361     $shutdown = 1;
362     $ww ||= $fh && AE::io $fh, 1, $wcb;
363     };
364    
365 root 1.9 my $id;
366 root 1.1
367 root 1.9 $arg{async}
368     ? sub {
369     $id = ($id == 0xffffffff ? 0 : $id) + 1;
370     $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
371 root 1.1
372 root 1.9 $rcb{$id} = pop;
373    
374     $guard; # keep it alive
375    
376     $wbuf .= pack "LL/a*", $id, &$f;
377     $ww ||= $fh && AE::io $fh, 1, $wcb;
378     }
379     : sub {
380     push @rcb, pop;
381    
382     $guard; # keep it alive
383    
384     $wbuf .= pack "L/a*", &$f;
385     $ww ||= $fh && AE::io $fh, 1, $wcb;
386     }
387 root 1.1 }
388    
389 root 1.4 =item $rpc->(..., $cb->(...))
390    
391     The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
392     reference. There are two things you can do with it: call it, and let it go
393     out of scope (let it get destroyed).
394    
395     If C<async> was false when C<$rpc> was created (the default), then, if you
396     call C<$rpc>, the C<$function> is invoked with all arguments passed to
397     C<$rpc> except the last one (the callback). When the function returns, the
398     callback will be invoked with all the return values.
399    
400     If C<async> was true, then the C<$function> receives an additional
401     initial argument, the result callback. In this case, returning from
402     C<$function> does nothing - the function only counts as "done" when the
403     result callback is called, and any arguments passed to it are considered
404     the return values. This makes it possible to "return" from event handlers
405     or e.g. Coro threads.
406    
407     The other thing that can be done with the RPC object is to destroy it. In
408     this case, the child process will execute all remaining RPC calls, report
409     their results, and then exit.
410    
411 root 1.8 See the examples section earlier in this document for some actual
412     examples.
413    
414 root 1.1 =back
415    
416     =head1 CHILD PROCESS USAGE
417    
418 root 1.4 The following function is not available in this module. They are only
419     available in the namespace of this module when the child is running,
420     without having to load any extra modules. They are part of the child-side
421     API of L<AnyEvent::Fork::RPC>.
422 root 1.1
423     =over 4
424    
425     =item AnyEvent::Fork::RPC::event ...
426    
427     Send an event to the parent. Events are a bit like RPC calls made by the
428     child process to the parent, except that there is no notion of return
429     values.
430    
431 root 1.8 See the examples section earlier in this document for some actual
432     examples.
433    
434 root 1.1 =back
435    
436     =head1 SEE ALSO
437    
438     L<AnyEvent::Fork> (to create the processes in the first place),
439     L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
440    
441     =head1 AUTHOR AND CONTACT INFORMATION
442    
443     Marc Lehmann <schmorp@schmorp.de>
444     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
445    
446     =cut
447    
448     1
449