ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.2 by root, Wed Apr 17 17:08:16 2013 UTC vs.
Revision 1.25 by root, Sun Apr 28 14:17:59 2013 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4 4
5THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6
5=head1 SYNOPSIS 7=head1 SYNOPSIS
6 8
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC; 9 use AnyEvent::Fork::RPC;
10 # use AnyEvent::Fork is not needed
9 11
10 my $rpc = AnyEvent::Fork 12 my $rpc = AnyEvent::Fork
11 ->new 13 ->new
12 ->require ("MyModule") 14 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 15 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server", 16 "MyModule::server",
15 ); 17 );
16 18
19 use AnyEvent;
20
17 my $cv = AE::cv; 21 my $cv = AE::cv;
18 22
19 $rpc->(1, 2, 3, sub { 23 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n"; 24 print "MyModule::server returned @_\n";
21 $cv->send; 25 $cv->send;
34concurrently in the child, using AnyEvent. 38concurrently in the child, using AnyEvent.
35 39
36It also implements an asynchronous event mechanism from the child to the 40It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 41parent, that could be used for progress indications or other information.
38 42
43Loading this module also always loads L<AnyEvent::Fork>, so you can make a
44separate C<use AnyEvent::Fork> if you wish, but you don't have to.
45
46=head1 EXAMPLES
47
48=head2 Example 1: Synchronous Backend
49
50Here is a simple example that implements a backend that executes C<unlink>
51and C<rmdir> calls, and reports their status back. It also reports the
52number of requests it has processed every three requests, which is clearly
53silly, but illustrates the use of events.
54
55First the parent process:
56
57 use AnyEvent;
58 use AnyEvent::Fork::RPC;
59
60 my $done = AE::cv;
61
62 my $rpc = AnyEvent::Fork
63 ->new
64 ->require ("MyWorker")
65 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
66 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
67 on_event => sub { warn "$_[0] requests handled\n" },
68 on_destroy => $done,
69 );
70
71 for my $id (1..6) {
72 $rpc->(rmdir => "/tmp/somepath/$id", sub {
73 $_[0]
74 or warn "/tmp/somepath/$id: $_[1]\n";
75 });
76 }
77
78 undef $rpc;
79
80 $done->recv;
81
82The parent creates the process, queues a few rmdir's. It then forgets
83about the C<$rpc> object, so that the child exits after it has handled the
84requests, and then it waits till the requests have been handled.
85
86The child is implemented using a separate module, C<MyWorker>, shown here:
87
88 package MyWorker;
89
90 my $count;
91
92 sub run {
93 my ($cmd, $path) = @_;
94
95 AnyEvent::Fork::RPC::event ($count)
96 unless ++$count % 3;
97
98 my $status = $cmd eq "rmdir" ? rmdir $path
99 : $cmd eq "unlink" ? unlink $path
100 : die "fatal error, illegal command '$cmd'";
101
102 $status or (0, "$!")
103 }
104
105 1
106
107The C<run> function first sends a "progress" event every three calls, and
108then executes C<rmdir> or C<unlink>, depending on the first parameter (or
109dies with a fatal error - obviously, you must never let this happen :).
110
111Eventually it returns the status value true if the command was successful,
112or the status value 0 and the stringified error message.
113
114On my system, running the first code fragment with the given
115F<MyWorker.pm> in the current directory yields:
116
117 /tmp/somepath/1: No such file or directory
118 /tmp/somepath/2: No such file or directory
119 3 requests handled
120 /tmp/somepath/3: No such file or directory
121 /tmp/somepath/4: No such file or directory
122 /tmp/somepath/5: No such file or directory
123 6 requests handled
124 /tmp/somepath/6: No such file or directory
125
126Obviously, none of the directories I am trying to delete even exist. Also,
127the events and responses are processed in exactly the same order as
128they were created in the child, which is true for both synchronous and
129asynchronous backends.
130
131Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
132not optional. That is because the function isn't defined when the code is
133compiled. You can make sure it is visible by pre-loading the correct
134backend module in the call to C<require>:
135
136 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
137
138Since the backend module declares the C<event> function, loading it first
139ensures that perl will correctly interpret calls to it.
140
141And as a final remark, there is a fine module on CPAN that can
142asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
143than this example, namely L<IO::AIO>.
144
145=head3 Example 1a: the same with the asynchronous backend
146
147This example only shows what needs to be changed to use the async backend
148instead. Doing this is not very useful, the purpose of this example is
149to show the minimum amount of change that is required to go from the
150synchronous to the asynchronous backend.
151
152To use the async backend in the previous example, you need to add the
153C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
154
155 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
156 async => 1,
157 ...
158
159And since the function call protocol is now changed, you need to adopt
160C<MyWorker::run> to the async API.
161
162First, you need to accept the extra initial C<$done> callback:
163
164 sub run {
165 my ($done, $cmd, $path) = @_;
166
167And since a response is now generated when C<$done> is called, as opposed
168to when the function returns, we need to call the C<$done> function with
169the status:
170
171 $done->($status or (0, "$!"));
172
173A few remarks are in order. First, it's quite pointless to use the async
174backend for this example - but it I<is> possible. Second, you can call
175C<$done> before or after returning from the function. Third, having both
176returned from the function and having called the C<$done> callback, the
177child process may exit at any time, so you should call C<$done> only when
178you really I<are> done.
179
180=head2 Example 2: Asynchronous Backend
181
182This example implements multiple count-downs in the child, using
183L<AnyEvent> timers. While this is a bit silly (one could use timers in te
184parent just as well), it illustrates the ability to use AnyEvent in the
185child and the fact that responses can arrive in a different order then the
186requests.
187
188It also shows how to embed the actual child code into a C<__DATA__>
189section, so it doesn't need any external files at all.
190
191And when your parent process is often busy, and you have stricter timing
192requirements, then running timers in a child process suddenly doesn't look
193so silly anymore.
194
195Without further ado, here is the code:
196
197 use AnyEvent;
198 use AnyEvent::Fork::RPC;
199
200 my $done = AE::cv;
201
202 my $rpc = AnyEvent::Fork
203 ->new
204 ->require ("AnyEvent::Fork::RPC::Async")
205 ->eval (do { local $/; <DATA> })
206 ->AnyEvent::Fork::RPC::run ("run",
207 async => 1,
208 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
209 on_event => sub { print $_[0] },
210 on_destroy => $done,
211 );
212
213 for my $count (3, 2, 1) {
214 $rpc->($count, sub {
215 warn "job $count finished\n";
216 });
217 }
218
219 undef $rpc;
220
221 $done->recv;
222
223 __DATA__
224
225 # this ends up in main, as we don't use a package declaration
226
227 use AnyEvent;
228
229 sub run {
230 my ($done, $count) = @_;
231
232 my $n;
233
234 AnyEvent::Fork::RPC::event "starting to count up to $count\n";
235
236 my $w; $w = AE::timer 1, 1, sub {
237 ++$n;
238
239 AnyEvent::Fork::RPC::event "count $n of $count\n";
240
241 if ($n == $count) {
242 undef $w;
243 $done->();
244 }
245 };
246 }
247
248The parent part (the one before the C<__DATA__> section) isn't very
249different from the earlier examples. It sets async mode, preloads
250the backend module (so the C<AnyEvent::Fork::RPC::event> function is
251declared), uses a slightly different C<on_event> handler (which we use
252simply for logging purposes) and then, instead of loading a module with
253the actual worker code, it C<eval>'s the code from the data section in the
254child process.
255
256It then starts three countdowns, from 3 to 1 seconds downwards, destroys
257the rpc object so the example finishes eventually, and then just waits for
258the stuff to trickle in.
259
260The worker code uses the event function to log some progress messages, but
261mostly just creates a recurring one-second timer.
262
263The timer callback increments a counter, logs a message, and eventually,
264when the count has been reached, calls the finish callback.
265
266On my system, this results in the following output. Since all timers fire
267at roughly the same time, the actual order isn't guaranteed, but the order
268shown is very likely what you would get, too.
269
270 starting to count up to 3
271 starting to count up to 2
272 starting to count up to 1
273 count 1 of 3
274 count 1 of 2
275 count 1 of 1
276 job 1 finished
277 count 2 of 2
278 job 2 finished
279 count 2 of 3
280 count 3 of 3
281 job 3 finished
282
283While the overall ordering isn't guaranteed, the async backend still
284guarantees that events and responses are delivered to the parent process
285in the exact same ordering as they were generated in the child process.
286
287And unless your system is I<very> busy, it should clearly show that the
288job started last will finish first, as it has the lowest count.
289
290This concludes the async example. Since L<AnyEvent::Fork> does not
291actually fork, you are free to use about any module in the child, not just
292L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
293
39=head1 PARENT PROCESS USAGE 294=head1 PARENT PROCESS USAGE
40 295
41This module exports nothing, and only implements a single function: 296This module exports nothing, and only implements a single function:
42 297
43=over 4 298=over 4
50 305
51use Errno (); 306use Errno ();
52use Guard (); 307use Guard ();
53 308
54use AnyEvent; 309use AnyEvent;
55#use AnyEvent::Fork; 310# explicit version on next line, as some cpan-testers test with the 0.1 version,
311# ignoring dependencies, and this line will at least give a clear indication of that.
312use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
56 313
57our $VERSION = 0.1; 314our $VERSION = 1.1;
58 315
59=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 316=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60 317
61The traditional way to call it. But it is way cooler to call it in the 318The traditional way to call it. But it is way cooler to call it in the
62following way: 319following way:
92Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 349Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93child, with the arguments of that function passed to the callback. 350child, with the arguments of that function passed to the callback.
94 351
95Also called on errors when no C<on_error> handler is provided. 352Also called on errors when no C<on_error> handler is provided.
96 353
354=item on_destroy => $cb->()
355
356Called when the C<$rpc> object has been destroyed and all requests have
357been successfully handled. This is useful when you queue some requests and
358want the child to go away after it has handled them. The problem is that
359the parent must not exit either until all requests have been handled, and
360this can be accomplished by waiting for this callback.
361
97=item init => $function (default none) 362=item init => $function (default none)
98 363
99When specified (by name), this function is called in the child as the very 364When specified (by name), this function is called in the child as the very
100first thing when taking over the process, with all the arguments normally 365first thing when taking over the process, with all the arguments normally
101passed to the C<AnyEvent::Fork::run> function, except the communications 366passed to the C<AnyEvent::Fork::run> function, except the communications
102socket. 367socket.
103 368
104It can be used to do one-time things in the child such as storing passed 369It can be used to do one-time things in the child such as storing passed
105parameters or opening database connections. 370parameters or opening database connections.
106 371
372It is called very early - before the serialisers are created or the
373C<$function> name is resolved into a function reference, so it could be
374used to load any modules that provide the serialiser or function. It can
375not, however, create events.
376
107=item async => $boolean (default: 0) 377=item async => $boolean (default: 0)
108 378
109The default server used in the child does all I/O blockingly, and only 379The default server used in the child does all I/O blockingly, and only
110allows a single RPC call to execute concurrently. 380allows a single RPC call to execute concurrently.
111 381
112Setting C<async> to a true value switches to another implementation that 382Setting C<async> to a true value switches to another implementation that
113uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. 383uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
384does not support recursion in the event loop however, blocking condvar
385calls will fail).
114 386
115The actual API in the child is documented in the section that describes 387The actual API in the child is documented in the section that describes
116the calling semantics of the returned C<$rpc> function. 388the calling semantics of the returned C<$rpc> function.
117 389
118If you want to pre-load the actual back-end modules to enable memory 390If you want to pre-load the actual back-end modules to enable memory
119sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 391sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
120synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 392synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
121 393
122=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 394If you use a template process and want to fork both sync and async
395children, then it is permissible to load both modules.
396
397=item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
123 398
124All arguments, result data and event data have to be serialised to be 399All arguments, result data and event data have to be serialised to be
125transferred between the processes. For this, they have to be frozen and 400transferred between the processes. For this, they have to be frozen and
126thawed in both parent and child processes. 401thawed in both parent and child processes.
127 402
128By default, only octet strings can be passed between the processes, which 403By default, only octet strings can be passed between the processes, which
129is reasonably fast and efficient. 404is reasonably fast and efficient and requires no extra modules.
130 405
131For more complicated use cases, you can provide your own freeze and thaw 406For more complicated use cases, you can provide your own freeze and thaw
132functions, by specifying a string with perl source code. It's supposed to 407functions, by specifying a string with perl source code. It's supposed to
133return two code references when evaluated: the first receives a list of 408return two code references when evaluated: the first receives a list of
134perl values and must return an octet string. The second receives the octet 409perl values and must return an octet string. The second receives the octet
136 411
137If you need an external module for serialisation, then you can either 412If you need an external module for serialisation, then you can either
138pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 413pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
139or C<require> statement into the serialiser string. Or both. 414or C<require> statement into the serialiser string. Or both.
140 415
416Here are some examples - some of them are also available as global
417variables that make them easier to use.
418
419=over 4
420
421=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
422
423This serialiser concatenates length-prefixes octet strings, and is the
424default.
425
426Implementation:
427
428 (
429 sub { pack "(w/a*)*", @_ },
430 sub { unpack "(w/a*)*", shift }
431 )
432
433=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER>
434
435This serialiser creates JSON arrays - you have to make sure the L<JSON>
436module is installed for this serialiser to work. It can be beneficial for
437sharing when you preload the L<JSON> module in a template process.
438
439L<JSON> (with L<JSON::XS> installed) is slower than the octet string
440serialiser, but usually much faster than L<Storable>, unless big chunks of
441binary data need to be transferred.
442
443Implementation:
444
445 use JSON ();
446 (
447 sub { JSON::encode_json \@_ },
448 sub { @{ JSON::decode_json shift } }
449 )
450
451=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
452
453This serialiser uses L<Storable>, which means it has high chance of
454serialising just about anything you throw at it, at the cost of having
455very high overhead per operation. It also comes with perl.
456
457Implementation:
458
459 use Storable ();
460 (
461 sub { Storable::freeze \@_ },
462 sub { @{ Storable::thaw shift } }
463 )
464
141=back 465=back
142 466
467=back
468
469See the examples section earlier in this document for some actual
470examples.
471
143=cut 472=cut
144 473
145our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 474our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
146 475our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
147# ideally, we want (SvLEN - SvCUR) || 1024 or somesuch... 476our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
148sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 }
149 477
150sub run { 478sub run {
151 my ($self, $function, %arg) = @_; 479 my ($self, $function, %arg) = @_;
152 480
153 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 481 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
154 my $on_event = delete $arg{on_event}; 482 my $on_event = delete $arg{on_event};
155 my $on_error = delete $arg{on_error}; 483 my $on_error = delete $arg{on_error};
484 my $on_destroy = delete $arg{on_destroy};
156 485
157 # default for on_error is to on_event, if specified 486 # default for on_error is to on_event, if specified
158 $on_error ||= $on_event 487 $on_error ||= $on_event
159 ? sub { $on_event->(error => shift) } 488 ? sub { $on_event->(error => shift) }
160 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 489 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
162 # default for on_event is to raise an error 491 # default for on_event is to raise an error
163 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 492 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
164 493
165 my ($f, $t) = eval $serialiser; die $@ if $@; 494 my ($f, $t) = eval $serialiser; die $@ if $@;
166 495
167 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); 496 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
497 my ($rlen, $rbuf, $rw) = 512 - 16;
168 498
169 my $wcb = sub { 499 my $wcb = sub {
170 my $len = syswrite $fh, $wbuf; 500 my $len = syswrite $fh, $wbuf;
171 501
172 if (!defined $len) { 502 unless (defined $len) {
173 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 503 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
174 undef $rw; undef $ww; # it ends here 504 undef $rw; undef $ww; # it ends here
175 $on_error->("$!"); 505 $on_error->("$!");
176 } 506 }
177 } 507 }
188 518
189 $self->require ($module) 519 $self->require ($module)
190 ->send_arg ($function, $arg{init}, $serialiser) 520 ->send_arg ($function, $arg{init}, $serialiser)
191 ->run ("$module\::run", sub { 521 ->run ("$module\::run", sub {
192 $fh = shift; 522 $fh = shift;
523
524 my ($id, $len);
193 $rw = AE::io $fh, 0, sub { 525 $rw = AE::io $fh, 0, sub {
526 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
194 my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf; 527 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
195 528
196 if ($len) { 529 if ($len) {
197 while (5 <= length $rbuf) { 530 while (8 <= length $rbuf) {
198 $len = unpack "L", $rbuf; 531 ($id, $len) = unpack "NN", $rbuf;
199 4 + $len <= length $rbuf 532 8 + $len <= length $rbuf
200 or last; 533 or last;
201 534
202 my @r = $t->(substr $rbuf, 4, $len); 535 my @r = $t->(substr $rbuf, 8, $len);
203 substr $rbuf, 0, $len + 4, ""; 536 substr $rbuf, 0, 8 + $len, "";
537
538 if ($id) {
539 if (@rcb) {
540 (shift @rcb)->(@r);
541 } elsif (my $cb = delete $rcb{$id}) {
542 $cb->(@r);
543 } else {
544 undef $rw; undef $ww;
545 $on_error->("unexpected data from child");
204 546 }
205 if (pop @r) { 547 } else {
206 $on_event->(@r); 548 $on_event->(@r);
207 } elsif (@rcb) {
208 (shift @rcb)->(@r);
209 } else {
210 undef $rw; undef $ww;
211 $on_error->("unexpected data from child");
212 } 549 }
213 } 550 }
214 } elsif (defined $len) { 551 } elsif (defined $len) {
215 undef $rw; undef $ww; # it ends here 552 undef $rw; undef $ww; # it ends here
553
554 if (@rcb || %rcb) {
216 $on_error->("unexpected eof") 555 $on_error->("unexpected eof");
217 if @rcb; 556 } else {
557 $on_destroy->()
558 if $on_destroy;
559 }
218 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 560 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
219 undef $rw; undef $ww; # it ends here 561 undef $rw; undef $ww; # it ends here
220 $on_error->("read: $!"); 562 $on_error->("read: $!");
221 } 563 }
222 }; 564 };
224 $ww ||= AE::io $fh, 1, $wcb; 566 $ww ||= AE::io $fh, 1, $wcb;
225 }); 567 });
226 568
227 my $guard = Guard::guard { 569 my $guard = Guard::guard {
228 $shutdown = 1; 570 $shutdown = 1;
229 $ww ||= $fh && AE::io $fh, 1, $wcb; 571
572 shutdown $fh, 1 if $fh && !$ww;
230 }; 573 };
231 574
575 my $id;
576
577 $arg{async}
232 sub { 578 ? sub {
233 push @rcb, pop; 579 $id = ($id == 0xffffffff ? 0 : $id) + 1;
580 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
234 581
582 $rcb{$id} = pop;
583
235 $guard; # keep it alive 584 $guard if 0; # keep it alive
236 585
237 $wbuf .= pack "L/a*", &$f; 586 $wbuf .= pack "NN/a*", $id, &$f;
238 $ww ||= $fh && AE::io $fh, 1, $wcb; 587 $ww ||= $fh && AE::io $fh, 1, $wcb;
239 } 588 }
589 : sub {
590 push @rcb, pop;
591
592 $guard; # keep it alive
593
594 $wbuf .= pack "N/a*", &$f;
595 $ww ||= $fh && AE::io $fh, 1, $wcb;
596 }
240} 597}
241 598
599=item $rpc->(..., $cb->(...))
600
601The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
602reference. There are two things you can do with it: call it, and let it go
603out of scope (let it get destroyed).
604
605If C<async> was false when C<$rpc> was created (the default), then, if you
606call C<$rpc>, the C<$function> is invoked with all arguments passed to
607C<$rpc> except the last one (the callback). When the function returns, the
608callback will be invoked with all the return values.
609
610If C<async> was true, then the C<$function> receives an additional
611initial argument, the result callback. In this case, returning from
612C<$function> does nothing - the function only counts as "done" when the
613result callback is called, and any arguments passed to it are considered
614the return values. This makes it possible to "return" from event handlers
615or e.g. Coro threads.
616
617The other thing that can be done with the RPC object is to destroy it. In
618this case, the child process will execute all remaining RPC calls, report
619their results, and then exit.
620
621See the examples section earlier in this document for some actual
622examples.
623
242=back 624=back
243 625
244=head1 CHILD PROCESS USAGE 626=head1 CHILD PROCESS USAGE
245 627
246These functions are not available in this module. They are only available 628The following function is not available in this module. They are only
247in the namespace of this module when the child is running, without 629available in the namespace of this module when the child is running,
248having to load any extra module. They are part of the child-side API of 630without having to load any extra modules. They are part of the child-side
249L<AnyEvent::Fork::RPC>. 631API of L<AnyEvent::Fork::RPC>.
250 632
251=over 4 633=over 4
252
253=item AnyEvent::Fork::RPC::quit
254
255This function can be called to gracefully stop the child process when it
256is idle.
257
258After this function is called, the process stops handling incoming RPC
259requests, but outstanding events and function return values will be sent
260to the parent. When all data has been sent, the process calls C<exit>.
261
262Since the parent might not expect the child to exit at random points in
263time, it is often better to signal the parent by sending an C<event> and
264letting the parent close down the child process.
265 634
266=item AnyEvent::Fork::RPC::event ... 635=item AnyEvent::Fork::RPC::event ...
267 636
268Send an event to the parent. Events are a bit like RPC calls made by the 637Send an event to the parent. Events are a bit like RPC calls made by the
269child process to the parent, except that there is no notion of return 638child process to the parent, except that there is no notion of return
270values. 639values.
271 640
641See the examples section earlier in this document for some actual
642examples.
643
272=back 644=back
273 645
646=head1 ADVANCED TOPICS
647
648=head2 Choosing a backend
649
650So how do you decide which backend to use? Well, that's your problem to
651solve, but here are some thoughts on the matter:
652
653=over 4
654
655=item Synchronous
656
657The synchronous backend does not rely on any external modules (well,
658except L<common::sense>, which works around a bug in how perl's warning
659system works). This keeps the process very small, for example, on my
660system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
661after C<use warnings; use strict> (for people who grew up with C64s around
662them this is probably shocking every single time they see it). The worker
663process in the first example in this document uses 1792kB.
664
665Since the calls are done synchronously, slow jobs will keep newer jobs
666from executing.
667
668The synchronous backend also has no overhead due to running an event loop
669- reading requests is therefore very efficient, while writing responses is
670less so, as every response results in a write syscall.
671
672If the parent process is busy and a bit slow reading responses, the child
673waits instead of processing further requests. This also limits the amount
674of memory needed for buffering, as never more than one response has to be
675buffered.
676
677The API in the child is simple - you just have to define a function that
678does something and returns something.
679
680It's hard to use modules or code that relies on an event loop, as the
681child cannot execute anything while it waits for more input.
682
683=item Asynchronous
684
685The asynchronous backend relies on L<AnyEvent>, which tries to be small,
686but still comes at a price: On my system, the worker from example 1a uses
6873420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
688which in turn loads a lot of other modules such as L<warnings>, L<strict>,
689L<vars>, L<Exporter>...).
690
691It batches requests and responses reasonably efficiently, doing only as
692few reads and writes as needed, but needs to poll for events via the event
693loop.
694
695Responses are queued when the parent process is busy. This means the child
696can continue to execute any queued requests. It also means that a child
697might queue a lot of responses in memory when it generates them and the
698parent process is slow accepting them.
699
700The API is not a straightforward RPC pattern - you have to call a
701"done" callback to pass return values and signal completion. Also, more
702importantly, the API starts jobs as fast as possible - when 1000 jobs
703are queued and the jobs are slow, they will all run concurrently. The
704child must implement some queueing/limiting mechanism if this causes
705problems. Alternatively, the parent could limit the amount of rpc calls
706that are outstanding.
707
708Blocking use of condvars is not supported.
709
710Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
711easy.
712
713=back
714
715=head2 Passing file descriptors
716
717Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
718descriptor passing abilities.
719
720The reason is that passing file descriptors is extraordinary tricky
721business, and conflicts with efficient batching of messages.
722
723There still is a method you can use: Create a
724C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
725the process before you pass control to C<AnyEvent::Fork::RPC::run>.
726
727Whenever you want to pass a file descriptor, send an rpc request to the
728child process (so it expects the descriptor), then send it over the other
729half of the socketpair. The child should fetch the descriptor from the
730half it has passed earlier.
731
732Here is some (untested) pseudocode to that effect:
733
734 use AnyEvent::Util;
735 use AnyEvent::Fork::RPC;
736 use IO::FDPass;
737
738 my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
739
740 my $rpc = AnyEvent::Fork
741 ->new
742 ->send_fh ($s2)
743 ->require ("MyWorker")
744 ->AnyEvent::Fork::RPC::run ("MyWorker::run"
745 init => "MyWorker::init",
746 );
747
748 undef $s2; # no need to keep it around
749
750 # pass an fd
751 $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
752
753 IO::FDPass fileno $s1, fileno $handle_to_pass;
754
755 $cv->recv;
756
757The MyWorker module could look like this:
758
759 package MyWorker;
760
761 use IO::FDPass;
762
763 my $s2;
764
765 sub init {
766 $s2 = $_[0];
767 }
768
769 sub run {
770 if ($_[0] eq "i'll send some fd now, please expect it!") {
771 my $fd = IO::FDPass::recv fileno $s2;
772 ...
773 }
774 }
775
776Of course, this might be blocking if you pass a lot of file descriptors,
777so you might want to look into L<AnyEvent::FDpasser> which can handle the
778gory details.
779
780=head1 EXCEPTIONS
781
782There are no provisions whatsoever for catching exceptions at this time -
783in the child, exeptions might kill the process, causing calls to be lost
784and the parent encountering a fatal error. In the parent, exceptions in
785the result callback will not be caught and cause undefined behaviour.
786
274=head1 SEE ALSO 787=head1 SEE ALSO
275 788
276L<AnyEvent::Fork> (to create the processes in the first place), 789L<AnyEvent::Fork>, to create the processes in the first place.
790
277L<AnyEvent::Fork::Pool> (to manage whole pools of processes). 791L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
278 792
279=head1 AUTHOR AND CONTACT INFORMATION 793=head1 AUTHOR AND CONTACT INFORMATION
280 794
281 Marc Lehmann <schmorp@schmorp.de> 795 Marc Lehmann <schmorp@schmorp.de>
282 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC 796 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines