ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.46
Committed: Sun Sep 15 20:18:14 2019 UTC (4 years, 9 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.45: +54 -4 lines
Log Message:
2.0

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7 root 1.27 use AnyEvent::Fork;
8 root 1.1 use AnyEvent::Fork::RPC;
9    
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17 root 1.16 use AnyEvent;
18    
19 root 1.1 my $cv = AE::cv;
20    
21     $rpc->(1, 2, 3, sub {
22     print "MyModule::server returned @_\n";
23     $cv->send;
24     });
25    
26     $cv->recv;
27    
28     =head1 DESCRIPTION
29    
30     This module implements a simple RPC protocol and backend for processes
31 root 1.29 created via L<AnyEvent::Fork> or L<AnyEvent::Fork::Remote>, allowing you
32 root 1.26 to call a function in the child process and receive its return values (up
33     to 4GB serialised).
34 root 1.1
35     It implements two different backends: a synchronous one that works like a
36     normal function call, and an asynchronous one that can run multiple jobs
37     concurrently in the child, using AnyEvent.
38    
39     It also implements an asynchronous event mechanism from the child to the
40     parent, that could be used for progress indications or other information.
41    
42 root 1.4 =head1 EXAMPLES
43    
44 root 1.10 =head2 Example 1: Synchronous Backend
45 root 1.4
46     Here is a simple example that implements a backend that executes C<unlink>
47     and C<rmdir> calls, and reports their status back. It also reports the
48     number of requests it has processed every three requests, which is clearly
49     silly, but illustrates the use of events.
50    
51     First the parent process:
52    
53     use AnyEvent;
54 root 1.27 use AnyEvent::Fork;
55 root 1.4 use AnyEvent::Fork::RPC;
56    
57     my $done = AE::cv;
58    
59     my $rpc = AnyEvent::Fork
60     ->new
61     ->require ("MyWorker")
62     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 root 1.29 on_error => sub { warn "ERROR: $_[0]"; exit 1 },
64 root 1.4 on_event => sub { warn "$_[0] requests handled\n" },
65     on_destroy => $done,
66     );
67    
68     for my $id (1..6) {
69     $rpc->(rmdir => "/tmp/somepath/$id", sub {
70     $_[0]
71     or warn "/tmp/somepath/$id: $_[1]\n";
72     });
73     }
74    
75     undef $rpc;
76    
77     $done->recv;
78    
79     The parent creates the process, queues a few rmdir's. It then forgets
80     about the C<$rpc> object, so that the child exits after it has handled the
81     requests, and then it waits till the requests have been handled.
82    
83     The child is implemented using a separate module, C<MyWorker>, shown here:
84    
85     package MyWorker;
86    
87     my $count;
88    
89     sub run {
90     my ($cmd, $path) = @_;
91    
92     AnyEvent::Fork::RPC::event ($count)
93     unless ++$count % 3;
94    
95     my $status = $cmd eq "rmdir" ? rmdir $path
96     : $cmd eq "unlink" ? unlink $path
97     : die "fatal error, illegal command '$cmd'";
98    
99     $status or (0, "$!")
100     }
101    
102     1
103    
104     The C<run> function first sends a "progress" event every three calls, and
105     then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106     dies with a fatal error - obviously, you must never let this happen :).
107    
108     Eventually it returns the status value true if the command was successful,
109     or the status value 0 and the stringified error message.
110    
111 root 1.6 On my system, running the first code fragment with the given
112 root 1.4 F<MyWorker.pm> in the current directory yields:
113    
114     /tmp/somepath/1: No such file or directory
115     /tmp/somepath/2: No such file or directory
116     3 requests handled
117     /tmp/somepath/3: No such file or directory
118     /tmp/somepath/4: No such file or directory
119     /tmp/somepath/5: No such file or directory
120     6 requests handled
121     /tmp/somepath/6: No such file or directory
122    
123     Obviously, none of the directories I am trying to delete even exist. Also,
124     the events and responses are processed in exactly the same order as
125     they were created in the child, which is true for both synchronous and
126     asynchronous backends.
127    
128     Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129     not optional. That is because the function isn't defined when the code is
130     compiled. You can make sure it is visible by pre-loading the correct
131     backend module in the call to C<require>:
132    
133     ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134    
135     Since the backend module declares the C<event> function, loading it first
136     ensures that perl will correctly interpret calls to it.
137    
138     And as a final remark, there is a fine module on CPAN that can
139     asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140     than this example, namely L<IO::AIO>.
141    
142 root 1.10 =head3 Example 1a: the same with the asynchronous backend
143    
144     This example only shows what needs to be changed to use the async backend
145     instead. Doing this is not very useful, the purpose of this example is
146     to show the minimum amount of change that is required to go from the
147     synchronous to the asynchronous backend.
148    
149     To use the async backend in the previous example, you need to add the
150     C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
151    
152     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153     async => 1,
154     ...
155    
156     And since the function call protocol is now changed, you need to adopt
157     C<MyWorker::run> to the async API.
158    
159     First, you need to accept the extra initial C<$done> callback:
160    
161     sub run {
162     my ($done, $cmd, $path) = @_;
163    
164     And since a response is now generated when C<$done> is called, as opposed
165     to when the function returns, we need to call the C<$done> function with
166     the status:
167    
168     $done->($status or (0, "$!"));
169    
170     A few remarks are in order. First, it's quite pointless to use the async
171     backend for this example - but it I<is> possible. Second, you can call
172     C<$done> before or after returning from the function. Third, having both
173     returned from the function and having called the C<$done> callback, the
174     child process may exit at any time, so you should call C<$done> only when
175     you really I<are> done.
176    
177     =head2 Example 2: Asynchronous Backend
178    
179 root 1.11 This example implements multiple count-downs in the child, using
180 root 1.34 L<AnyEvent> timers. While this is a bit silly (one could use timers in the
181 root 1.11 parent just as well), it illustrates the ability to use AnyEvent in the
182     child and the fact that responses can arrive in a different order then the
183     requests.
184    
185     It also shows how to embed the actual child code into a C<__DATA__>
186     section, so it doesn't need any external files at all.
187    
188     And when your parent process is often busy, and you have stricter timing
189     requirements, then running timers in a child process suddenly doesn't look
190     so silly anymore.
191    
192     Without further ado, here is the code:
193    
194     use AnyEvent;
195 root 1.27 use AnyEvent::Fork;
196 root 1.11 use AnyEvent::Fork::RPC;
197    
198     my $done = AE::cv;
199    
200     my $rpc = AnyEvent::Fork
201     ->new
202     ->require ("AnyEvent::Fork::RPC::Async")
203     ->eval (do { local $/; <DATA> })
204     ->AnyEvent::Fork::RPC::run ("run",
205     async => 1,
206 root 1.29 on_error => sub { warn "ERROR: $_[0]"; exit 1 },
207 root 1.11 on_event => sub { print $_[0] },
208     on_destroy => $done,
209     );
210    
211     for my $count (3, 2, 1) {
212     $rpc->($count, sub {
213     warn "job $count finished\n";
214     });
215     }
216    
217     undef $rpc;
218    
219     $done->recv;
220    
221     __DATA__
222    
223     # this ends up in main, as we don't use a package declaration
224    
225     use AnyEvent;
226    
227     sub run {
228     my ($done, $count) = @_;
229    
230     my $n;
231    
232     AnyEvent::Fork::RPC::event "starting to count up to $count\n";
233    
234     my $w; $w = AE::timer 1, 1, sub {
235     ++$n;
236    
237     AnyEvent::Fork::RPC::event "count $n of $count\n";
238    
239     if ($n == $count) {
240     undef $w;
241     $done->();
242     }
243     };
244     }
245    
246     The parent part (the one before the C<__DATA__> section) isn't very
247     different from the earlier examples. It sets async mode, preloads
248     the backend module (so the C<AnyEvent::Fork::RPC::event> function is
249     declared), uses a slightly different C<on_event> handler (which we use
250     simply for logging purposes) and then, instead of loading a module with
251     the actual worker code, it C<eval>'s the code from the data section in the
252     child process.
253    
254     It then starts three countdowns, from 3 to 1 seconds downwards, destroys
255     the rpc object so the example finishes eventually, and then just waits for
256     the stuff to trickle in.
257    
258     The worker code uses the event function to log some progress messages, but
259     mostly just creates a recurring one-second timer.
260    
261     The timer callback increments a counter, logs a message, and eventually,
262     when the count has been reached, calls the finish callback.
263    
264     On my system, this results in the following output. Since all timers fire
265     at roughly the same time, the actual order isn't guaranteed, but the order
266     shown is very likely what you would get, too.
267    
268     starting to count up to 3
269     starting to count up to 2
270     starting to count up to 1
271     count 1 of 3
272     count 1 of 2
273     count 1 of 1
274     job 1 finished
275     count 2 of 2
276     job 2 finished
277     count 2 of 3
278     count 3 of 3
279     job 3 finished
280    
281     While the overall ordering isn't guaranteed, the async backend still
282     guarantees that events and responses are delivered to the parent process
283     in the exact same ordering as they were generated in the child process.
284    
285     And unless your system is I<very> busy, it should clearly show that the
286     job started last will finish first, as it has the lowest count.
287    
288     This concludes the async example. Since L<AnyEvent::Fork> does not
289     actually fork, you are free to use about any module in the child, not just
290     L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
291 root 1.10
292 root 1.29 =head2 Example 3: Asynchronous backend with Coro
293    
294     With L<Coro> you can create a nice asynchronous backend implementation by
295     defining an rpc server function that creates a new Coro thread for every
296     request that calls a function "normally", i.e. the parameters from the
297     parent process are passed to it, and any return values are returned to the
298     parent process, e.g.:
299    
300     package My::Arith;
301    
302     sub add {
303     return $_[0] + $_[1];
304     }
305    
306     sub mul {
307     return $_[0] * $_[1];
308     }
309    
310     sub run {
311     my ($done, $func, @arg) = @_;
312    
313     Coro::async_pool {
314     $done->($func->(@arg));
315     };
316     }
317    
318     The C<run> function creates a new thread for every invocation, using the
319     first argument as function name, and calls the C<$done> callback on it's
320     return values. This makes it quite natural to define the C<add> and C<mul>
321     functions to add or multiply two numbers and return the result.
322    
323     Since this is the asynchronous backend, it's quite possible to define RPC
324     function that do I/O or wait for external events - their execution will
325     overlap as needed.
326    
327     The above could be used like this:
328    
329     my $rpc = AnyEvent::Fork
330     ->new
331     ->require ("MyWorker")
332     ->AnyEvent::Fork::RPC::run ("My::Arith::run",
333     on_error => ..., on_event => ..., on_destroy => ...,
334     );
335    
336     $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
337     $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;
338    
339     The C<say>'s will print C<4> and C<6>.
340    
341 root 1.30 =head2 Example 4: Forward AnyEvent::Log messages using C<on_event>
342    
343     This partial example shows how to use the C<event> function to forward
344     L<AnyEvent::Log> messages to the parent.
345    
346     For this, the parent needs to provide a suitable C<on_event>:
347    
348     ->AnyEvent::Fork::RPC::run (
349     on_event => sub {
350     if ($_[0] eq "ae_log") {
351     my (undef, $level, $message) = @_;
352     AE::log $level, $message;
353     } else {
354     # other event types
355     }
356     },
357     )
358    
359     In the child, as early as possible, the following code should reconfigure
360     L<AnyEvent::Log> to log via C<AnyEvent::Fork::RPC::event>:
361    
362     $AnyEvent::Log::LOG->log_cb (sub {
363     my ($timestamp, $orig_ctx, $level, $message) = @{+shift};
364    
365     if (defined &AnyEvent::Fork::RPC::event) {
366     AnyEvent::Fork::RPC::event (ae_log => $level, $message);
367     } else {
368     warn "[$$ before init] $message\n";
369     }
370     });
371    
372     There is an important twist - the C<AnyEvent::Fork::RPC::event> function
373     is only defined when the child is fully initialised. If you redirect the
374     log messages in your C<init> function for example, then the C<event>
375     function might not yet be available. This is why the log callback checks
376 root 1.44 whether the function is there using C<defined>, and only then uses it to
377 root 1.30 log the message.
378    
379 root 1.1 =head1 PARENT PROCESS USAGE
380    
381     This module exports nothing, and only implements a single function:
382    
383     =over 4
384    
385     =cut
386    
387     package AnyEvent::Fork::RPC;
388    
389     use common::sense;
390    
391     use Errno ();
392     use Guard ();
393    
394     use AnyEvent;
395    
396 root 1.46 our $VERSION = '2.0';
397 root 1.1
398     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
399    
400     The traditional way to call it. But it is way cooler to call it in the
401     following way:
402    
403     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
404    
405     This C<run> function/method can be used in place of the
406     L<AnyEvent::Fork::run> method. Just like that method, it takes over
407     the L<AnyEvent::Fork> process, but instead of calling the specified
408     C<$function> directly, it runs a server that accepts RPC calls and handles
409     responses.
410    
411     It returns a function reference that can be used to call the function in
412     the child process, handling serialisation and data transfers.
413    
414     The following key/value pairs are allowed. It is recommended to have at
415     least an C<on_error> or C<on_event> handler set.
416    
417     =over 4
418    
419     =item on_error => $cb->($msg)
420    
421     Called on (fatal) errors, with a descriptive (hopefully) message. If
422     this callback is not provided, but C<on_event> is, then the C<on_event>
423     callback is called with the first argument being the string C<error>,
424     followed by the error message.
425    
426 root 1.29 If neither handler is provided, then the error is reported with loglevel
427     C<error> via C<AE::log>.
428 root 1.1
429     =item on_event => $cb->(...)
430    
431     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
432     child, with the arguments of that function passed to the callback.
433    
434     Also called on errors when no C<on_error> handler is provided.
435    
436 root 1.4 =item on_destroy => $cb->()
437    
438     Called when the C<$rpc> object has been destroyed and all requests have
439     been successfully handled. This is useful when you queue some requests and
440     want the child to go away after it has handled them. The problem is that
441     the parent must not exit either until all requests have been handled, and
442 root 1.6 this can be accomplished by waiting for this callback.
443 root 1.4
444 root 1.45 =item init => $function (default: none)
445 root 1.1
446     When specified (by name), this function is called in the child as the very
447     first thing when taking over the process, with all the arguments normally
448     passed to the C<AnyEvent::Fork::run> function, except the communications
449     socket.
450    
451     It can be used to do one-time things in the child such as storing passed
452     parameters or opening database connections.
453    
454 root 1.4 It is called very early - before the serialisers are created or the
455     C<$function> name is resolved into a function reference, so it could be
456     used to load any modules that provide the serialiser or function. It can
457     not, however, create events.
458    
459 root 1.45 =item done => $function (default: C<CORE::exit>)
460 root 1.31
461     The function to call when the asynchronous backend detects an end of file
462     condition when reading from the communications socket I<and> there are no
463 root 1.46 outstanding requests. It is ignored by the synchronous backend.
464 root 1.31
465     By overriding this you can prolong the life of a RPC process after e.g.
466     the parent has exited by running the event loop in the provided function
467     (or simply calling it, for example, when your child process uses L<EV> you
468 root 1.37 could provide L<EV::run> as C<done> function).
469 root 1.31
470     Of course, in that case you are responsible for exiting at the appropriate
471     time and not returning from
472    
473 root 1.45 =item async => $boolean (default: C<0>)
474 root 1.1
475     The default server used in the child does all I/O blockingly, and only
476     allows a single RPC call to execute concurrently.
477    
478     Setting C<async> to a true value switches to another implementation that
479 root 1.15 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
480     does not support recursion in the event loop however, blocking condvar
481     calls will fail).
482 root 1.1
483     The actual API in the child is documented in the section that describes
484     the calling semantics of the returned C<$rpc> function.
485    
486 root 1.2 If you want to pre-load the actual back-end modules to enable memory
487     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
488     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
489    
490 root 1.4 If you use a template process and want to fork both sync and async
491 root 1.6 children, then it is permissible to load both modules.
492 root 1.4
493 root 1.45 =item serialiser => $string (default: C<$AnyEvent::Fork::RPC::STRING_SERIALISER>)
494 root 1.1
495     All arguments, result data and event data have to be serialised to be
496     transferred between the processes. For this, they have to be frozen and
497     thawed in both parent and child processes.
498    
499 root 1.36 By default, only octet strings can be passed between the processes,
500     which is reasonably fast and efficient and requires no extra modules
501     (the C<AnyEvent::Fork::RPC> distribution does not provide these extra
502     serialiser modules).
503 root 1.1
504     For more complicated use cases, you can provide your own freeze and thaw
505     functions, by specifying a string with perl source code. It's supposed to
506     return two code references when evaluated: the first receives a list of
507     perl values and must return an octet string. The second receives the octet
508     string and must return the original list of values.
509    
510 root 1.2 If you need an external module for serialisation, then you can either
511     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
512     or C<require> statement into the serialiser string. Or both.
513    
514 root 1.38 Here are some examples - all of them are also available as global
515 root 1.14 variables that make them easier to use.
516    
517     =over 4
518    
519 root 1.37 =item C<$AnyEvent::Fork::RPC::STRING_SERIALISER> - octet strings only
520 root 1.14
521 root 1.37 This serialiser (currently the default) concatenates length-prefixes octet
522     strings, and is the default. That means you can only pass (and return)
523     strings containing character codes 0-255.
524    
525     The main advantages of this serialiser are the high speed and that it
526     doesn't need another module. The main disadvantage is that you are very
527     limited in what you can pass - only octet strings.
528 root 1.14
529     Implementation:
530    
531     (
532     sub { pack "(w/a*)*", @_ },
533     sub { unpack "(w/a*)*", shift }
534     )
535    
536 root 1.37 =item C<$AnyEvent::Fork::RPC::CBOR_XS_SERIALISER> - uses L<CBOR::XS>
537 root 1.35
538     This serialiser creates CBOR::XS arrays - you have to make sure the
539     L<CBOR::XS> module is installed for this serialiser to work. It can be
540     beneficial for sharing when you preload the L<CBOR::XS> module in a template
541     process.
542    
543     L<CBOR::XS> is about as fast as the octet string serialiser, but supports
544     complex data structures (similar to JSON) and is faster than any of the
545     other serialisers. If you have the L<CBOR::XS> module available, it's the
546     best choice.
547    
548 root 1.36 The encoder enables C<allow_sharing> (so this serialisation method can
549     encode cyclic and self-referencing data structures).
550 root 1.35
551     Implementation:
552    
553     use CBOR::XS ();
554     (
555 root 1.36 sub { CBOR::XS::encode_cbor_sharing \@_ },
556 root 1.35 sub { @{ CBOR::XS::decode_cbor shift } }
557     )
558    
559 root 1.37 =item C<$AnyEvent::Fork::RPC::JSON_SERIALISER> - uses L<JSON::XS> or L<JSON>
560 root 1.14
561     This serialiser creates JSON arrays - you have to make sure the L<JSON>
562     module is installed for this serialiser to work. It can be beneficial for
563     sharing when you preload the L<JSON> module in a template process.
564    
565     L<JSON> (with L<JSON::XS> installed) is slower than the octet string
566     serialiser, but usually much faster than L<Storable>, unless big chunks of
567     binary data need to be transferred.
568    
569     Implementation:
570    
571     use JSON ();
572     (
573     sub { JSON::encode_json \@_ },
574     sub { @{ JSON::decode_json shift } }
575     )
576    
577 root 1.37 =item C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> - L<Storable>
578 root 1.14
579     This serialiser uses L<Storable>, which means it has high chance of
580     serialising just about anything you throw at it, at the cost of having
581 root 1.29 very high overhead per operation. It also comes with perl. It should be
582     used when you need to serialise complex data structures.
583 root 1.14
584     Implementation:
585    
586     use Storable ();
587     (
588     sub { Storable::freeze \@_ },
589     sub { @{ Storable::thaw shift } }
590     )
591    
592 root 1.37 =item C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> - portable Storable
593 root 1.28
594     This serialiser also uses L<Storable>, but uses it's "network" format
595 root 1.29 to serialise data, which makes it possible to talk to different
596     perl binaries (for example, when talking to a process created with
597 root 1.28 L<AnyEvent::Fork::Remote>).
598    
599     Implementation:
600    
601     use Storable ();
602     (
603     sub { Storable::nfreeze \@_ },
604     sub { @{ Storable::thaw shift } }
605     )
606    
607 root 1.14 =back
608    
609 root 1.45 =item buflen => $bytes (default: C<512 - 16>)
610    
611     The starting size of the read buffer for request and response data.
612    
613     C<AnyEvent::Fork::RPC> ensures that the buffer for reeading request and
614     response data is large enough for at leats aingle request or response, and
615     will dynamically enlarge the buffer if needed.
616    
617     While this ensures that memory is not overly wasted, it typically leads
618     to having to do one syscall per request, which can be inefficient in some
619     cases. In such cases, it can be beneficient to increase the buffer size to
620     hold more than one request.
621    
622     =item buflen_req => $bytes (default: same as C<buflen>)
623    
624     Overrides C<buflen> for request data (as read by the forked process).
625    
626     =item buflen_res => $bytes (default: same as C<buflen>)
627    
628     Overrides C<buflen> for response data (replies read by the parent process).
629    
630 root 1.1 =back
631    
632 root 1.9 See the examples section earlier in this document for some actual
633     examples.
634 root 1.8
635 root 1.1 =cut
636    
637 root 1.28 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
638 root 1.36 our $CBOR_XS_SERIALISER = 'use CBOR::XS (); (sub { CBOR::XS::encode_cbor_sharing \@_ }, sub { @{ CBOR::XS::decode_cbor shift } })';
639     our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
640 root 1.28 our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
641     our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })';
642 root 1.2
643 root 1.1 sub run {
644     my ($self, $function, %arg) = @_;
645    
646 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
647 root 1.1 my $on_event = delete $arg{on_event};
648     my $on_error = delete $arg{on_error};
649 root 1.4 my $on_destroy = delete $arg{on_destroy};
650 root 1.1
651     # default for on_error is to on_event, if specified
652     $on_error ||= $on_event
653     ? sub { $on_event->(error => shift) }
654 root 1.29 : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." };
655 root 1.1
656     # default for on_event is to raise an error
657     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
658    
659     my ($f, $t) = eval $serialiser; die $@ if $@;
660    
661 root 1.9 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
662 root 1.45 my ($rlen, $rbuf, $rw) = $arg{buflen_res} || $arg{buflen} || 512 - 16;
663 root 1.1
664     my $wcb = sub {
665     my $len = syswrite $fh, $wbuf;
666    
667 root 1.9 unless (defined $len) {
668 root 1.1 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
669     undef $rw; undef $ww; # it ends here
670     $on_error->("$!");
671     }
672     }
673    
674     substr $wbuf, 0, $len, "";
675    
676     unless (length $wbuf) {
677     undef $ww;
678     $shutdown and shutdown $fh, 1;
679     }
680     };
681    
682     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
683    
684 root 1.45 $self->eval ("use $module 2 ()")
685     ->send_arg (
686     function => $function,
687     init => $arg{init},
688     serialiser => $serialiser,
689     done => $arg{done} || "$module\::do_exit",
690     rlen => $arg{buflen_req} || $arg{buflen} || 512 - 16,
691     -10 # the above are 10 arguments
692     )
693 root 1.1 ->run ("$module\::run", sub {
694 root 1.42 $fh = shift
695     or return $on_error->("connection failed");
696 root 1.9
697     my ($id, $len);
698 root 1.1 $rw = AE::io $fh, 0, sub {
699 root 1.4 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
700 root 1.9 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
701 root 1.1
702     if ($len) {
703 root 1.9 while (8 <= length $rbuf) {
704 root 1.24 ($id, $len) = unpack "NN", $rbuf;
705 root 1.9 8 + $len <= length $rbuf
706 root 1.2 or last;
707    
708 root 1.9 my @r = $t->(substr $rbuf, 8, $len);
709     substr $rbuf, 0, 8 + $len, "";
710    
711     if ($id) {
712     if (@rcb) {
713     (shift @rcb)->(@r);
714     } elsif (my $cb = delete $rcb{$id}) {
715     $cb->(@r);
716     } else {
717     undef $rw; undef $ww;
718     $on_error->("unexpected data from child");
719     }
720     } else {
721 root 1.2 $on_event->(@r);
722 root 1.1 }
723     }
724     } elsif (defined $len) {
725     undef $rw; undef $ww; # it ends here
726 root 1.4
727 root 1.9 if (@rcb || %rcb) {
728 root 1.4 $on_error->("unexpected eof");
729     } else {
730 root 1.20 $on_destroy->()
731     if $on_destroy;
732 root 1.4 }
733 root 1.1 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
734     undef $rw; undef $ww; # it ends here
735     $on_error->("read: $!");
736     }
737     };
738    
739     $ww ||= AE::io $fh, 1, $wcb;
740     });
741    
742     my $guard = Guard::guard {
743     $shutdown = 1;
744 root 1.18
745 root 1.19 shutdown $fh, 1 if $fh && !$ww;
746 root 1.1 };
747    
748 root 1.9 my $id;
749 root 1.1
750 root 1.9 $arg{async}
751     ? sub {
752     $id = ($id == 0xffffffff ? 0 : $id) + 1;
753     $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
754 root 1.1
755 root 1.9 $rcb{$id} = pop;
756    
757 root 1.20 $guard if 0; # keep it alive
758 root 1.9
759 root 1.24 $wbuf .= pack "NN/a*", $id, &$f;
760 root 1.9 $ww ||= $fh && AE::io $fh, 1, $wcb;
761     }
762     : sub {
763     push @rcb, pop;
764    
765     $guard; # keep it alive
766    
767 root 1.24 $wbuf .= pack "N/a*", &$f;
768 root 1.9 $ww ||= $fh && AE::io $fh, 1, $wcb;
769     }
770 root 1.1 }
771    
772 root 1.4 =item $rpc->(..., $cb->(...))
773    
774     The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
775     reference. There are two things you can do with it: call it, and let it go
776     out of scope (let it get destroyed).
777    
778     If C<async> was false when C<$rpc> was created (the default), then, if you
779     call C<$rpc>, the C<$function> is invoked with all arguments passed to
780     C<$rpc> except the last one (the callback). When the function returns, the
781     callback will be invoked with all the return values.
782    
783     If C<async> was true, then the C<$function> receives an additional
784     initial argument, the result callback. In this case, returning from
785     C<$function> does nothing - the function only counts as "done" when the
786     result callback is called, and any arguments passed to it are considered
787     the return values. This makes it possible to "return" from event handlers
788     or e.g. Coro threads.
789    
790     The other thing that can be done with the RPC object is to destroy it. In
791     this case, the child process will execute all remaining RPC calls, report
792     their results, and then exit.
793    
794 root 1.8 See the examples section earlier in this document for some actual
795     examples.
796    
797 root 1.1 =back
798    
799     =head1 CHILD PROCESS USAGE
800    
801 root 1.4 The following function is not available in this module. They are only
802     available in the namespace of this module when the child is running,
803     without having to load any extra modules. They are part of the child-side
804     API of L<AnyEvent::Fork::RPC>.
805 root 1.1
806 root 1.46 Note that these functions are typically not yet declared when code is
807     compiled into the child, because the backend module is only loaded when
808     you call C<run>, which is typically the last method you call on the fork
809     object.
810    
811     Therefore, you either have to explicitly pre-load the right backend module
812     or mark calls to these functions as function calls, e.g.:
813    
814     AnyEvent::Fork::RPC::event (0 => "five");
815     AnyEvent::Fork::RPC::event->(0 => "five");
816     &AnyEvent::Fork::RPC::flush;
817    
818 root 1.1 =over 4
819    
820 root 1.46 =item AnyEvent::Fork::RPC::event (...)
821 root 1.1
822     Send an event to the parent. Events are a bit like RPC calls made by the
823     child process to the parent, except that there is no notion of return
824     values.
825    
826 root 1.8 See the examples section earlier in this document for some actual
827     examples.
828    
829 root 1.40 Note: the event data, like any data send to the parent, might not be sent
830     immediatelly but queued for later sending, so there is no guarantee that
831     the event has been sent to the parent when the call returns - when you
832     e.g. exit directly after calling this function, the parent might never
833 root 1.46 receive the event. See the next function for a remedy.
834    
835     =item $success = AnyEvent::Fork::RPC::flush ()
836    
837     Synchronously wait and flush the reply data to the parent. Returns true on
838     success and false otherwise (i.e. when the reply data cannot be written at
839     all). Ignoring the success status is a common and healthy behaviour.
840    
841     Only the "async" backend does something on C<flush> - the "sync" backend
842     is not buffering reply data and always returns true from this function.
843    
844     Normally, reply data might or might not be written to the parent
845     immediatelly but is buffered. This can greatly improve performance and
846     efficiency, but sometimes can get in your way: for example. when you want
847     to send an error message just before exiting, or when you want to ensure
848     replies timely reach the parent before starting a long blocking operation.
849    
850     In these cases, you can call this function to flush any outstanding reply
851     data to the parent. This is done blockingly, so no requests will be
852     handled and no event callbacks will be called.
853    
854     For example, you could wrap your request function in a C<eval> block and
855     report the exception string back to the caller just before exiting:
856    
857     sub req {
858     ...
859    
860     eval {
861     ...
862     };
863    
864     if ($@) {
865     AnyEvent::RPC::event (throw => "$@");
866     AnyEvent::RPC::flush ();
867     exit;
868     }
869    
870     ...
871     }
872 root 1.40
873 root 1.1 =back
874    
875 root 1.31 =head2 PROCESS EXIT
876    
877     If and when the child process exits depends on the backend and
878     configuration. Apart from explicit exits (e.g. by calling C<exit>) or
879     runtime conditions (uncaught exceptions, signals etc.), the backends exit
880     under these conditions:
881    
882     =over 4
883    
884     =item Synchronous Backend
885    
886     The synchronous backend is very simple: when the process waits for another
887     request to arrive and the writing side (usually in the parent) is closed,
888     it will exit normally, i.e. as if your main program reached the end of the
889     file.
890    
891     That means that if your parent process exits, the RPC process will usually
892     exit as well, either because it is idle anyway, or because it executes a
893     request. In the latter case, you will likely get an error when the RPc
894     process tries to send the results to the parent (because agruably, you
895     shouldn't exit your parent while there are still outstanding requests).
896    
897     The process is usually quiescent when it happens, so it should rarely be a
898     problem, and C<END> handlers can be used to clean up.
899    
900     =item Asynchronous Backend
901    
902     For the asynchronous backend, things are more complicated: Whenever it
903     listens for another request by the parent, it might detect that the socket
904     was closed (e.g. because the parent exited). It will sotp listening for
905     new requests and instead try to write out any remaining data (if any) or
906 root 1.34 simply check whether the socket can be written to. After this, the RPC
907 root 1.31 process is effectively done - no new requests are incoming, no outstanding
908     request data can be written back.
909    
910     Since chances are high that there are event watchers that the RPC server
911     knows nothing about (why else would one use the async backend if not for
912     the ability to register watchers?), the event loop would often happily
913     continue.
914    
915     This is why the asynchronous backend explicitly calls C<CORE::exit> when
916 root 1.32 it is done (under other circumstances, such as when there is an I/O error
917     and there is outstanding data to write, it will log a fatal message via
918     L<AnyEvent::Log>, also causing the program to exit).
919 root 1.31
920     You can override this by specifying a function name to call via the C<done>
921     parameter instead.
922    
923     =back
924    
925 root 1.12 =head1 ADVANCED TOPICS
926    
927     =head2 Choosing a backend
928    
929     So how do you decide which backend to use? Well, that's your problem to
930     solve, but here are some thoughts on the matter:
931    
932     =over 4
933    
934     =item Synchronous
935    
936     The synchronous backend does not rely on any external modules (well,
937     except L<common::sense>, which works around a bug in how perl's warning
938     system works). This keeps the process very small, for example, on my
939     system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
940     after C<use warnings; use strict> (for people who grew up with C64s around
941     them this is probably shocking every single time they see it). The worker
942     process in the first example in this document uses 1792kB.
943    
944     Since the calls are done synchronously, slow jobs will keep newer jobs
945     from executing.
946    
947     The synchronous backend also has no overhead due to running an event loop
948     - reading requests is therefore very efficient, while writing responses is
949     less so, as every response results in a write syscall.
950    
951     If the parent process is busy and a bit slow reading responses, the child
952     waits instead of processing further requests. This also limits the amount
953     of memory needed for buffering, as never more than one response has to be
954     buffered.
955    
956     The API in the child is simple - you just have to define a function that
957     does something and returns something.
958    
959     It's hard to use modules or code that relies on an event loop, as the
960     child cannot execute anything while it waits for more input.
961    
962     =item Asynchronous
963    
964     The asynchronous backend relies on L<AnyEvent>, which tries to be small,
965     but still comes at a price: On my system, the worker from example 1a uses
966     3420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
967     which in turn loads a lot of other modules such as L<warnings>, L<strict>,
968     L<vars>, L<Exporter>...).
969    
970     It batches requests and responses reasonably efficiently, doing only as
971     few reads and writes as needed, but needs to poll for events via the event
972     loop.
973    
974     Responses are queued when the parent process is busy. This means the child
975     can continue to execute any queued requests. It also means that a child
976     might queue a lot of responses in memory when it generates them and the
977     parent process is slow accepting them.
978    
979     The API is not a straightforward RPC pattern - you have to call a
980     "done" callback to pass return values and signal completion. Also, more
981     importantly, the API starts jobs as fast as possible - when 1000 jobs
982     are queued and the jobs are slow, they will all run concurrently. The
983     child must implement some queueing/limiting mechanism if this causes
984     problems. Alternatively, the parent could limit the amount of rpc calls
985     that are outstanding.
986    
987 root 1.37 Blocking use of condvars is not supported (in the main thread, outside of
988     e.g. L<Coro> threads).
989 root 1.15
990 root 1.12 Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
991     easy.
992    
993     =back
994    
995     =head2 Passing file descriptors
996    
997     Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
998     descriptor passing abilities.
999    
1000     The reason is that passing file descriptors is extraordinary tricky
1001     business, and conflicts with efficient batching of messages.
1002    
1003     There still is a method you can use: Create a
1004     C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
1005     the process before you pass control to C<AnyEvent::Fork::RPC::run>.
1006    
1007     Whenever you want to pass a file descriptor, send an rpc request to the
1008     child process (so it expects the descriptor), then send it over the other
1009     half of the socketpair. The child should fetch the descriptor from the
1010     half it has passed earlier.
1011    
1012     Here is some (untested) pseudocode to that effect:
1013    
1014     use AnyEvent::Util;
1015 root 1.27 use AnyEvent::Fork;
1016 root 1.12 use AnyEvent::Fork::RPC;
1017     use IO::FDPass;
1018    
1019     my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
1020    
1021     my $rpc = AnyEvent::Fork
1022     ->new
1023     ->send_fh ($s2)
1024     ->require ("MyWorker")
1025     ->AnyEvent::Fork::RPC::run ("MyWorker::run"
1026     init => "MyWorker::init",
1027     );
1028    
1029     undef $s2; # no need to keep it around
1030    
1031     # pass an fd
1032     $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
1033    
1034     IO::FDPass fileno $s1, fileno $handle_to_pass;
1035    
1036     $cv->recv;
1037    
1038     The MyWorker module could look like this:
1039    
1040     package MyWorker;
1041    
1042     use IO::FDPass;
1043    
1044     my $s2;
1045    
1046     sub init {
1047     $s2 = $_[0];
1048     }
1049    
1050     sub run {
1051     if ($_[0] eq "i'll send some fd now, please expect it!") {
1052     my $fd = IO::FDPass::recv fileno $s2;
1053     ...
1054     }
1055     }
1056    
1057     Of course, this might be blocking if you pass a lot of file descriptors,
1058     so you might want to look into L<AnyEvent::FDpasser> which can handle the
1059     gory details.
1060    
1061 root 1.21 =head1 EXCEPTIONS
1062    
1063     There are no provisions whatsoever for catching exceptions at this time -
1064 root 1.37 in the child, exceptions might kill the process, causing calls to be lost
1065 root 1.21 and the parent encountering a fatal error. In the parent, exceptions in
1066     the result callback will not be caught and cause undefined behaviour.
1067    
1068 root 1.1 =head1 SEE ALSO
1069    
1070 root 1.16 L<AnyEvent::Fork>, to create the processes in the first place.
1071    
1072 root 1.27 L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.
1073 root 1.26
1074 root 1.16 L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
1075 root 1.1
1076     =head1 AUTHOR AND CONTACT INFORMATION
1077    
1078     Marc Lehmann <schmorp@schmorp.de>
1079     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
1080    
1081     =cut
1082    
1083     1
1084