ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.16
Committed: Thu Apr 18 14:07:15 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.15: +5 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork::RPC;
8 root 1.7 # use AnyEvent::Fork is not needed
9 root 1.1
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17 root 1.16 use AnyEvent;
18    
19 root 1.1 my $cv = AE::cv;
20    
21     $rpc->(1, 2, 3, sub {
22     print "MyModule::server returned @_\n";
23     $cv->send;
24     });
25    
26     $cv->recv;
27    
28     =head1 DESCRIPTION
29    
30     This module implements a simple RPC protocol and backend for processes
31     created via L<AnyEvent::Fork>, allowing you to call a function in the
32     child process and receive its return values (up to 4GB serialised).
33    
34     It implements two different backends: a synchronous one that works like a
35     normal function call, and an asynchronous one that can run multiple jobs
36     concurrently in the child, using AnyEvent.
37    
38     It also implements an asynchronous event mechanism from the child to the
39     parent, that could be used for progress indications or other information.
40    
41 root 1.7 Loading this module also always loads L<AnyEvent::Fork>, so you can make a
42     separate C<use AnyEvent::Fork> if you wish, but you don't have to.
43    
44 root 1.4 =head1 EXAMPLES
45    
46 root 1.10 =head2 Example 1: Synchronous Backend
47 root 1.4
48     Here is a simple example that implements a backend that executes C<unlink>
49     and C<rmdir> calls, and reports their status back. It also reports the
50     number of requests it has processed every three requests, which is clearly
51     silly, but illustrates the use of events.
52    
53     First the parent process:
54    
55     use AnyEvent;
56     use AnyEvent::Fork::RPC;
57    
58     my $done = AE::cv;
59    
60     my $rpc = AnyEvent::Fork
61     ->new
62     ->require ("MyWorker")
63     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
64 root 1.5 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
65 root 1.4 on_event => sub { warn "$_[0] requests handled\n" },
66     on_destroy => $done,
67     );
68    
69     for my $id (1..6) {
70     $rpc->(rmdir => "/tmp/somepath/$id", sub {
71     $_[0]
72     or warn "/tmp/somepath/$id: $_[1]\n";
73     });
74     }
75    
76     undef $rpc;
77    
78     $done->recv;
79    
80     The parent creates the process, queues a few rmdir's. It then forgets
81     about the C<$rpc> object, so that the child exits after it has handled the
82     requests, and then it waits till the requests have been handled.
83    
84     The child is implemented using a separate module, C<MyWorker>, shown here:
85    
86     package MyWorker;
87    
88     my $count;
89    
90     sub run {
91     my ($cmd, $path) = @_;
92    
93     AnyEvent::Fork::RPC::event ($count)
94     unless ++$count % 3;
95    
96     my $status = $cmd eq "rmdir" ? rmdir $path
97     : $cmd eq "unlink" ? unlink $path
98     : die "fatal error, illegal command '$cmd'";
99    
100     $status or (0, "$!")
101     }
102    
103     1
104    
105     The C<run> function first sends a "progress" event every three calls, and
106     then executes C<rmdir> or C<unlink>, depending on the first parameter (or
107     dies with a fatal error - obviously, you must never let this happen :).
108    
109     Eventually it returns the status value true if the command was successful,
110     or the status value 0 and the stringified error message.
111    
112 root 1.6 On my system, running the first code fragment with the given
113 root 1.4 F<MyWorker.pm> in the current directory yields:
114    
115     /tmp/somepath/1: No such file or directory
116     /tmp/somepath/2: No such file or directory
117     3 requests handled
118     /tmp/somepath/3: No such file or directory
119     /tmp/somepath/4: No such file or directory
120     /tmp/somepath/5: No such file or directory
121     6 requests handled
122     /tmp/somepath/6: No such file or directory
123    
124     Obviously, none of the directories I am trying to delete even exist. Also,
125     the events and responses are processed in exactly the same order as
126     they were created in the child, which is true for both synchronous and
127     asynchronous backends.
128    
129     Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
130     not optional. That is because the function isn't defined when the code is
131     compiled. You can make sure it is visible by pre-loading the correct
132     backend module in the call to C<require>:
133    
134     ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
135    
136     Since the backend module declares the C<event> function, loading it first
137     ensures that perl will correctly interpret calls to it.
138    
139     And as a final remark, there is a fine module on CPAN that can
140     asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
141     than this example, namely L<IO::AIO>.
142    
143 root 1.10 =head3 Example 1a: the same with the asynchronous backend
144    
145     This example only shows what needs to be changed to use the async backend
146     instead. Doing this is not very useful, the purpose of this example is
147     to show the minimum amount of change that is required to go from the
148     synchronous to the asynchronous backend.
149    
150     To use the async backend in the previous example, you need to add the
151     C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
152    
153     ->AnyEvent::Fork::RPC::run ("MyWorker::run",
154     async => 1,
155     ...
156    
157     And since the function call protocol is now changed, you need to adopt
158     C<MyWorker::run> to the async API.
159    
160     First, you need to accept the extra initial C<$done> callback:
161    
162     sub run {
163     my ($done, $cmd, $path) = @_;
164    
165     And since a response is now generated when C<$done> is called, as opposed
166     to when the function returns, we need to call the C<$done> function with
167     the status:
168    
169     $done->($status or (0, "$!"));
170    
171     A few remarks are in order. First, it's quite pointless to use the async
172     backend for this example - but it I<is> possible. Second, you can call
173     C<$done> before or after returning from the function. Third, having both
174     returned from the function and having called the C<$done> callback, the
175     child process may exit at any time, so you should call C<$done> only when
176     you really I<are> done.
177    
178     =head2 Example 2: Asynchronous Backend
179    
180 root 1.11 This example implements multiple count-downs in the child, using
181     L<AnyEvent> timers. While this is a bit silly (one could use timers in te
182     parent just as well), it illustrates the ability to use AnyEvent in the
183     child and the fact that responses can arrive in a different order then the
184     requests.
185    
186     It also shows how to embed the actual child code into a C<__DATA__>
187     section, so it doesn't need any external files at all.
188    
189     And when your parent process is often busy, and you have stricter timing
190     requirements, then running timers in a child process suddenly doesn't look
191     so silly anymore.
192    
193     Without further ado, here is the code:
194    
195     use AnyEvent;
196     use AnyEvent::Fork::RPC;
197    
198     my $done = AE::cv;
199    
200     my $rpc = AnyEvent::Fork
201     ->new
202     ->require ("AnyEvent::Fork::RPC::Async")
203     ->eval (do { local $/; <DATA> })
204     ->AnyEvent::Fork::RPC::run ("run",
205     async => 1,
206     on_error => sub { warn "FATAL: $_[0]"; exit 1 },
207     on_event => sub { print $_[0] },
208     on_destroy => $done,
209     );
210    
211     for my $count (3, 2, 1) {
212     $rpc->($count, sub {
213     warn "job $count finished\n";
214     });
215     }
216    
217     undef $rpc;
218    
219     $done->recv;
220    
221     __DATA__
222    
223     # this ends up in main, as we don't use a package declaration
224    
225     use AnyEvent;
226    
227     sub run {
228     my ($done, $count) = @_;
229    
230     my $n;
231    
232     AnyEvent::Fork::RPC::event "starting to count up to $count\n";
233    
234     my $w; $w = AE::timer 1, 1, sub {
235     ++$n;
236    
237     AnyEvent::Fork::RPC::event "count $n of $count\n";
238    
239     if ($n == $count) {
240     undef $w;
241     $done->();
242     }
243     };
244     }
245    
246     The parent part (the one before the C<__DATA__> section) isn't very
247     different from the earlier examples. It sets async mode, preloads
248     the backend module (so the C<AnyEvent::Fork::RPC::event> function is
249     declared), uses a slightly different C<on_event> handler (which we use
250     simply for logging purposes) and then, instead of loading a module with
251     the actual worker code, it C<eval>'s the code from the data section in the
252     child process.
253    
254     It then starts three countdowns, from 3 to 1 seconds downwards, destroys
255     the rpc object so the example finishes eventually, and then just waits for
256     the stuff to trickle in.
257    
258     The worker code uses the event function to log some progress messages, but
259     mostly just creates a recurring one-second timer.
260    
261     The timer callback increments a counter, logs a message, and eventually,
262     when the count has been reached, calls the finish callback.
263    
264     On my system, this results in the following output. Since all timers fire
265     at roughly the same time, the actual order isn't guaranteed, but the order
266     shown is very likely what you would get, too.
267    
268     starting to count up to 3
269     starting to count up to 2
270     starting to count up to 1
271     count 1 of 3
272     count 1 of 2
273     count 1 of 1
274     job 1 finished
275     count 2 of 2
276     job 2 finished
277     count 2 of 3
278     count 3 of 3
279     job 3 finished
280    
281     While the overall ordering isn't guaranteed, the async backend still
282     guarantees that events and responses are delivered to the parent process
283     in the exact same ordering as they were generated in the child process.
284    
285     And unless your system is I<very> busy, it should clearly show that the
286     job started last will finish first, as it has the lowest count.
287    
288     This concludes the async example. Since L<AnyEvent::Fork> does not
289     actually fork, you are free to use about any module in the child, not just
290     L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
291 root 1.10
292 root 1.1 =head1 PARENT PROCESS USAGE
293    
294     This module exports nothing, and only implements a single function:
295    
296     =over 4
297    
298     =cut
299    
300     package AnyEvent::Fork::RPC;
301    
302     use common::sense;
303    
304     use Errno ();
305     use Guard ();
306    
307     use AnyEvent;
308 root 1.7 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
309 root 1.1
310     our $VERSION = 0.1;
311    
312     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
313    
314     The traditional way to call it. But it is way cooler to call it in the
315     following way:
316    
317     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
318    
319     This C<run> function/method can be used in place of the
320     L<AnyEvent::Fork::run> method. Just like that method, it takes over
321     the L<AnyEvent::Fork> process, but instead of calling the specified
322     C<$function> directly, it runs a server that accepts RPC calls and handles
323     responses.
324    
325     It returns a function reference that can be used to call the function in
326     the child process, handling serialisation and data transfers.
327    
328     The following key/value pairs are allowed. It is recommended to have at
329     least an C<on_error> or C<on_event> handler set.
330    
331     =over 4
332    
333     =item on_error => $cb->($msg)
334    
335     Called on (fatal) errors, with a descriptive (hopefully) message. If
336     this callback is not provided, but C<on_event> is, then the C<on_event>
337     callback is called with the first argument being the string C<error>,
338     followed by the error message.
339    
340     If neither handler is provided it prints the error to STDERR and will
341     start failing badly.
342    
343     =item on_event => $cb->(...)
344    
345     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
346     child, with the arguments of that function passed to the callback.
347    
348     Also called on errors when no C<on_error> handler is provided.
349    
350 root 1.4 =item on_destroy => $cb->()
351    
352     Called when the C<$rpc> object has been destroyed and all requests have
353     been successfully handled. This is useful when you queue some requests and
354     want the child to go away after it has handled them. The problem is that
355     the parent must not exit either until all requests have been handled, and
356 root 1.6 this can be accomplished by waiting for this callback.
357 root 1.4
358 root 1.1 =item init => $function (default none)
359    
360     When specified (by name), this function is called in the child as the very
361     first thing when taking over the process, with all the arguments normally
362     passed to the C<AnyEvent::Fork::run> function, except the communications
363     socket.
364    
365     It can be used to do one-time things in the child such as storing passed
366     parameters or opening database connections.
367    
368 root 1.4 It is called very early - before the serialisers are created or the
369     C<$function> name is resolved into a function reference, so it could be
370     used to load any modules that provide the serialiser or function. It can
371     not, however, create events.
372    
373 root 1.1 =item async => $boolean (default: 0)
374    
375     The default server used in the child does all I/O blockingly, and only
376     allows a single RPC call to execute concurrently.
377    
378     Setting C<async> to a true value switches to another implementation that
379 root 1.15 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
380     does not support recursion in the event loop however, blocking condvar
381     calls will fail).
382 root 1.1
383     The actual API in the child is documented in the section that describes
384     the calling semantics of the returned C<$rpc> function.
385    
386 root 1.2 If you want to pre-load the actual back-end modules to enable memory
387     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
388     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
389    
390 root 1.4 If you use a template process and want to fork both sync and async
391 root 1.6 children, then it is permissible to load both modules.
392 root 1.4
393 root 1.14 =item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
394 root 1.1
395     All arguments, result data and event data have to be serialised to be
396     transferred between the processes. For this, they have to be frozen and
397     thawed in both parent and child processes.
398    
399     By default, only octet strings can be passed between the processes, which
400 root 1.14 is reasonably fast and efficient and requires no extra modules.
401 root 1.1
402     For more complicated use cases, you can provide your own freeze and thaw
403     functions, by specifying a string with perl source code. It's supposed to
404     return two code references when evaluated: the first receives a list of
405     perl values and must return an octet string. The second receives the octet
406     string and must return the original list of values.
407    
408 root 1.2 If you need an external module for serialisation, then you can either
409     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
410     or C<require> statement into the serialiser string. Or both.
411    
412 root 1.14 Here are some examples - some of them are also available as global
413     variables that make them easier to use.
414    
415     =over 4
416    
417     =item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
418    
419     This serialiser concatenates length-prefixes octet strings, and is the
420     default.
421    
422     Implementation:
423    
424     (
425     sub { pack "(w/a*)*", @_ },
426     sub { unpack "(w/a*)*", shift }
427     )
428    
429     =item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER>
430    
431     This serialiser creates JSON arrays - you have to make sure the L<JSON>
432     module is installed for this serialiser to work. It can be beneficial for
433     sharing when you preload the L<JSON> module in a template process.
434    
435     L<JSON> (with L<JSON::XS> installed) is slower than the octet string
436     serialiser, but usually much faster than L<Storable>, unless big chunks of
437     binary data need to be transferred.
438    
439     Implementation:
440    
441     use JSON ();
442     (
443     sub { JSON::encode_json \@_ },
444     sub { @{ JSON::decode_json shift } }
445     )
446    
447     =item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
448    
449     This serialiser uses L<Storable>, which means it has high chance of
450     serialising just about anything you throw at it, at the cost of having
451     very high overhead per operation. It also comes with perl.
452    
453     Implementation:
454    
455     use Storable ();
456     (
457     sub { Storable::freeze \@_ },
458     sub { @{ Storable::thaw shift } }
459     )
460    
461     =back
462    
463 root 1.1 =back
464    
465 root 1.9 See the examples section earlier in this document for some actual
466     examples.
467 root 1.8
468 root 1.1 =cut
469    
470 root 1.14 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
471     our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
472     our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
473 root 1.2
474 root 1.1 sub run {
475     my ($self, $function, %arg) = @_;
476    
477 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
478 root 1.1 my $on_event = delete $arg{on_event};
479     my $on_error = delete $arg{on_error};
480 root 1.4 my $on_destroy = delete $arg{on_destroy};
481 root 1.1
482     # default for on_error is to on_event, if specified
483     $on_error ||= $on_event
484     ? sub { $on_event->(error => shift) }
485     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
486    
487     # default for on_event is to raise an error
488     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
489    
490     my ($f, $t) = eval $serialiser; die $@ if $@;
491    
492 root 1.9 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
493     my ($rlen, $rbuf, $rw) = 512 - 16;
494 root 1.1
495     my $wcb = sub {
496     my $len = syswrite $fh, $wbuf;
497    
498 root 1.9 unless (defined $len) {
499 root 1.1 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
500     undef $rw; undef $ww; # it ends here
501     $on_error->("$!");
502     }
503     }
504    
505     substr $wbuf, 0, $len, "";
506    
507     unless (length $wbuf) {
508     undef $ww;
509     $shutdown and shutdown $fh, 1;
510     }
511     };
512    
513     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
514    
515     $self->require ($module)
516     ->send_arg ($function, $arg{init}, $serialiser)
517     ->run ("$module\::run", sub {
518     $fh = shift;
519 root 1.9
520     my ($id, $len);
521 root 1.1 $rw = AE::io $fh, 0, sub {
522 root 1.4 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
523 root 1.9 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
524 root 1.1
525     if ($len) {
526 root 1.9 while (8 <= length $rbuf) {
527     ($id, $len) = unpack "LL", $rbuf;
528     8 + $len <= length $rbuf
529 root 1.2 or last;
530    
531 root 1.9 my @r = $t->(substr $rbuf, 8, $len);
532     substr $rbuf, 0, 8 + $len, "";
533    
534     if ($id) {
535     if (@rcb) {
536     (shift @rcb)->(@r);
537     } elsif (my $cb = delete $rcb{$id}) {
538     $cb->(@r);
539     } else {
540     undef $rw; undef $ww;
541     $on_error->("unexpected data from child");
542     }
543     } else {
544 root 1.2 $on_event->(@r);
545 root 1.1 }
546     }
547     } elsif (defined $len) {
548     undef $rw; undef $ww; # it ends here
549 root 1.4
550 root 1.9 if (@rcb || %rcb) {
551 root 1.4 $on_error->("unexpected eof");
552     } else {
553     $on_destroy->();
554     }
555 root 1.1 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
556     undef $rw; undef $ww; # it ends here
557     $on_error->("read: $!");
558     }
559     };
560    
561     $ww ||= AE::io $fh, 1, $wcb;
562     });
563    
564     my $guard = Guard::guard {
565     $shutdown = 1;
566     $ww ||= $fh && AE::io $fh, 1, $wcb;
567     };
568    
569 root 1.9 my $id;
570 root 1.1
571 root 1.9 $arg{async}
572     ? sub {
573     $id = ($id == 0xffffffff ? 0 : $id) + 1;
574     $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
575 root 1.1
576 root 1.9 $rcb{$id} = pop;
577    
578     $guard; # keep it alive
579    
580     $wbuf .= pack "LL/a*", $id, &$f;
581     $ww ||= $fh && AE::io $fh, 1, $wcb;
582     }
583     : sub {
584     push @rcb, pop;
585    
586     $guard; # keep it alive
587    
588     $wbuf .= pack "L/a*", &$f;
589     $ww ||= $fh && AE::io $fh, 1, $wcb;
590     }
591 root 1.1 }
592    
593 root 1.4 =item $rpc->(..., $cb->(...))
594    
595     The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
596     reference. There are two things you can do with it: call it, and let it go
597     out of scope (let it get destroyed).
598    
599     If C<async> was false when C<$rpc> was created (the default), then, if you
600     call C<$rpc>, the C<$function> is invoked with all arguments passed to
601     C<$rpc> except the last one (the callback). When the function returns, the
602     callback will be invoked with all the return values.
603    
604     If C<async> was true, then the C<$function> receives an additional
605     initial argument, the result callback. In this case, returning from
606     C<$function> does nothing - the function only counts as "done" when the
607     result callback is called, and any arguments passed to it are considered
608     the return values. This makes it possible to "return" from event handlers
609     or e.g. Coro threads.
610    
611     The other thing that can be done with the RPC object is to destroy it. In
612     this case, the child process will execute all remaining RPC calls, report
613     their results, and then exit.
614    
615 root 1.8 See the examples section earlier in this document for some actual
616     examples.
617    
618 root 1.1 =back
619    
620     =head1 CHILD PROCESS USAGE
621    
622 root 1.4 The following function is not available in this module. They are only
623     available in the namespace of this module when the child is running,
624     without having to load any extra modules. They are part of the child-side
625     API of L<AnyEvent::Fork::RPC>.
626 root 1.1
627     =over 4
628    
629     =item AnyEvent::Fork::RPC::event ...
630    
631     Send an event to the parent. Events are a bit like RPC calls made by the
632     child process to the parent, except that there is no notion of return
633     values.
634    
635 root 1.8 See the examples section earlier in this document for some actual
636     examples.
637    
638 root 1.1 =back
639    
640 root 1.12 =head1 ADVANCED TOPICS
641    
642     =head2 Choosing a backend
643    
644     So how do you decide which backend to use? Well, that's your problem to
645     solve, but here are some thoughts on the matter:
646    
647     =over 4
648    
649     =item Synchronous
650    
651     The synchronous backend does not rely on any external modules (well,
652     except L<common::sense>, which works around a bug in how perl's warning
653     system works). This keeps the process very small, for example, on my
654     system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
655     after C<use warnings; use strict> (for people who grew up with C64s around
656     them this is probably shocking every single time they see it). The worker
657     process in the first example in this document uses 1792kB.
658    
659     Since the calls are done synchronously, slow jobs will keep newer jobs
660     from executing.
661    
662     The synchronous backend also has no overhead due to running an event loop
663     - reading requests is therefore very efficient, while writing responses is
664     less so, as every response results in a write syscall.
665    
666     If the parent process is busy and a bit slow reading responses, the child
667     waits instead of processing further requests. This also limits the amount
668     of memory needed for buffering, as never more than one response has to be
669     buffered.
670    
671     The API in the child is simple - you just have to define a function that
672     does something and returns something.
673    
674     It's hard to use modules or code that relies on an event loop, as the
675     child cannot execute anything while it waits for more input.
676    
677     =item Asynchronous
678    
679     The asynchronous backend relies on L<AnyEvent>, which tries to be small,
680     but still comes at a price: On my system, the worker from example 1a uses
681     3420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
682     which in turn loads a lot of other modules such as L<warnings>, L<strict>,
683     L<vars>, L<Exporter>...).
684    
685     It batches requests and responses reasonably efficiently, doing only as
686     few reads and writes as needed, but needs to poll for events via the event
687     loop.
688    
689     Responses are queued when the parent process is busy. This means the child
690     can continue to execute any queued requests. It also means that a child
691     might queue a lot of responses in memory when it generates them and the
692     parent process is slow accepting them.
693    
694     The API is not a straightforward RPC pattern - you have to call a
695     "done" callback to pass return values and signal completion. Also, more
696     importantly, the API starts jobs as fast as possible - when 1000 jobs
697     are queued and the jobs are slow, they will all run concurrently. The
698     child must implement some queueing/limiting mechanism if this causes
699     problems. Alternatively, the parent could limit the amount of rpc calls
700     that are outstanding.
701    
702 root 1.15 Blocking use of condvars is not supported.
703    
704 root 1.12 Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
705     easy.
706    
707     =back
708    
709     =head2 Passing file descriptors
710    
711     Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
712     descriptor passing abilities.
713    
714     The reason is that passing file descriptors is extraordinary tricky
715     business, and conflicts with efficient batching of messages.
716    
717     There still is a method you can use: Create a
718     C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
719     the process before you pass control to C<AnyEvent::Fork::RPC::run>.
720    
721     Whenever you want to pass a file descriptor, send an rpc request to the
722     child process (so it expects the descriptor), then send it over the other
723     half of the socketpair. The child should fetch the descriptor from the
724     half it has passed earlier.
725    
726     Here is some (untested) pseudocode to that effect:
727    
728     use AnyEvent::Util;
729     use AnyEvent::Fork::RPC;
730     use IO::FDPass;
731    
732     my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
733    
734     my $rpc = AnyEvent::Fork
735     ->new
736     ->send_fh ($s2)
737     ->require ("MyWorker")
738     ->AnyEvent::Fork::RPC::run ("MyWorker::run"
739     init => "MyWorker::init",
740     );
741    
742     undef $s2; # no need to keep it around
743    
744     # pass an fd
745     $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
746    
747     IO::FDPass fileno $s1, fileno $handle_to_pass;
748    
749     $cv->recv;
750    
751     The MyWorker module could look like this:
752    
753     package MyWorker;
754    
755     use IO::FDPass;
756    
757     my $s2;
758    
759     sub init {
760     $s2 = $_[0];
761     }
762    
763     sub run {
764     if ($_[0] eq "i'll send some fd now, please expect it!") {
765     my $fd = IO::FDPass::recv fileno $s2;
766     ...
767     }
768     }
769    
770     Of course, this might be blocking if you pass a lot of file descriptors,
771     so you might want to look into L<AnyEvent::FDpasser> which can handle the
772     gory details.
773    
774 root 1.1 =head1 SEE ALSO
775    
776 root 1.16 L<AnyEvent::Fork>, to create the processes in the first place.
777    
778     L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
779 root 1.1
780     =head1 AUTHOR AND CONTACT INFORMATION
781    
782     Marc Lehmann <schmorp@schmorp.de>
783     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
784    
785     =cut
786    
787     1
788