ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.8 by root, Wed Apr 17 20:24:36 2013 UTC vs.
Revision 1.14 by root, Thu Apr 18 13:11:12 2013 UTC

39Loading this module also always loads L<AnyEvent::Fork>, so you can make a 39Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40separate C<use AnyEvent::Fork> if you wish, but you don't have to. 40separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41 41
42=head1 EXAMPLES 42=head1 EXAMPLES
43 43
44=head2 Synchronous Backend 44=head2 Example 1: Synchronous Backend
45 45
46Here is a simple example that implements a backend that executes C<unlink> 46Here is a simple example that implements a backend that executes C<unlink>
47and C<rmdir> calls, and reports their status back. It also reports the 47and C<rmdir> calls, and reports their status back. It also reports the
48number of requests it has processed every three requests, which is clearly 48number of requests it has processed every three requests, which is clearly
49silly, but illustrates the use of events. 49silly, but illustrates the use of events.
50 50
51First the parent process: 51First the parent process:
52 52
53 use AnyEvent; 53 use AnyEvent;
54 use AnyEvent::Fork;
55 use AnyEvent::Fork::RPC; 54 use AnyEvent::Fork::RPC;
56 55
57 my $done = AE::cv; 56 my $done = AE::cv;
58 57
59 my $rpc = AnyEvent::Fork 58 my $rpc = AnyEvent::Fork
137 136
138And as a final remark, there is a fine module on CPAN that can 137And as a final remark, there is a fine module on CPAN that can
139asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently 138asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140than this example, namely L<IO::AIO>. 139than this example, namely L<IO::AIO>.
141 140
141=head3 Example 1a: the same with the asynchronous backend
142
143This example only shows what needs to be changed to use the async backend
144instead. Doing this is not very useful, the purpose of this example is
145to show the minimum amount of change that is required to go from the
146synchronous to the asynchronous backend.
147
148To use the async backend in the previous example, you need to add the
149C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
150
151 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
152 async => 1,
153 ...
154
155And since the function call protocol is now changed, you need to adopt
156C<MyWorker::run> to the async API.
157
158First, you need to accept the extra initial C<$done> callback:
159
160 sub run {
161 my ($done, $cmd, $path) = @_;
162
163And since a response is now generated when C<$done> is called, as opposed
164to when the function returns, we need to call the C<$done> function with
165the status:
166
167 $done->($status or (0, "$!"));
168
169A few remarks are in order. First, it's quite pointless to use the async
170backend for this example - but it I<is> possible. Second, you can call
171C<$done> before or after returning from the function. Third, having both
172returned from the function and having called the C<$done> callback, the
173child process may exit at any time, so you should call C<$done> only when
174you really I<are> done.
175
176=head2 Example 2: Asynchronous Backend
177
178This example implements multiple count-downs in the child, using
179L<AnyEvent> timers. While this is a bit silly (one could use timers in te
180parent just as well), it illustrates the ability to use AnyEvent in the
181child and the fact that responses can arrive in a different order then the
182requests.
183
184It also shows how to embed the actual child code into a C<__DATA__>
185section, so it doesn't need any external files at all.
186
187And when your parent process is often busy, and you have stricter timing
188requirements, then running timers in a child process suddenly doesn't look
189so silly anymore.
190
191Without further ado, here is the code:
192
193 use AnyEvent;
194 use AnyEvent::Fork::RPC;
195
196 my $done = AE::cv;
197
198 my $rpc = AnyEvent::Fork
199 ->new
200 ->require ("AnyEvent::Fork::RPC::Async")
201 ->eval (do { local $/; <DATA> })
202 ->AnyEvent::Fork::RPC::run ("run",
203 async => 1,
204 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
205 on_event => sub { print $_[0] },
206 on_destroy => $done,
207 );
208
209 for my $count (3, 2, 1) {
210 $rpc->($count, sub {
211 warn "job $count finished\n";
212 });
213 }
214
215 undef $rpc;
216
217 $done->recv;
218
219 __DATA__
220
221 # this ends up in main, as we don't use a package declaration
222
223 use AnyEvent;
224
225 sub run {
226 my ($done, $count) = @_;
227
228 my $n;
229
230 AnyEvent::Fork::RPC::event "starting to count up to $count\n";
231
232 my $w; $w = AE::timer 1, 1, sub {
233 ++$n;
234
235 AnyEvent::Fork::RPC::event "count $n of $count\n";
236
237 if ($n == $count) {
238 undef $w;
239 $done->();
240 }
241 };
242 }
243
244The parent part (the one before the C<__DATA__> section) isn't very
245different from the earlier examples. It sets async mode, preloads
246the backend module (so the C<AnyEvent::Fork::RPC::event> function is
247declared), uses a slightly different C<on_event> handler (which we use
248simply for logging purposes) and then, instead of loading a module with
249the actual worker code, it C<eval>'s the code from the data section in the
250child process.
251
252It then starts three countdowns, from 3 to 1 seconds downwards, destroys
253the rpc object so the example finishes eventually, and then just waits for
254the stuff to trickle in.
255
256The worker code uses the event function to log some progress messages, but
257mostly just creates a recurring one-second timer.
258
259The timer callback increments a counter, logs a message, and eventually,
260when the count has been reached, calls the finish callback.
261
262On my system, this results in the following output. Since all timers fire
263at roughly the same time, the actual order isn't guaranteed, but the order
264shown is very likely what you would get, too.
265
266 starting to count up to 3
267 starting to count up to 2
268 starting to count up to 1
269 count 1 of 3
270 count 1 of 2
271 count 1 of 1
272 job 1 finished
273 count 2 of 2
274 job 2 finished
275 count 2 of 3
276 count 3 of 3
277 job 3 finished
278
279While the overall ordering isn't guaranteed, the async backend still
280guarantees that events and responses are delivered to the parent process
281in the exact same ordering as they were generated in the child process.
282
283And unless your system is I<very> busy, it should clearly show that the
284job started last will finish first, as it has the lowest count.
285
286This concludes the async example. Since L<AnyEvent::Fork> does not
287actually fork, you are free to use about any module in the child, not just
288L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
289
142=head1 PARENT PROCESS USAGE 290=head1 PARENT PROCESS USAGE
143 291
144This module exports nothing, and only implements a single function: 292This module exports nothing, and only implements a single function:
145 293
146=over 4 294=over 4
236synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 384synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
237 385
238If you use a template process and want to fork both sync and async 386If you use a template process and want to fork both sync and async
239children, then it is permissible to load both modules. 387children, then it is permissible to load both modules.
240 388
241=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 389=item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
242 390
243All arguments, result data and event data have to be serialised to be 391All arguments, result data and event data have to be serialised to be
244transferred between the processes. For this, they have to be frozen and 392transferred between the processes. For this, they have to be frozen and
245thawed in both parent and child processes. 393thawed in both parent and child processes.
246 394
247By default, only octet strings can be passed between the processes, which 395By default, only octet strings can be passed between the processes, which
248is reasonably fast and efficient. 396is reasonably fast and efficient and requires no extra modules.
249 397
250For more complicated use cases, you can provide your own freeze and thaw 398For more complicated use cases, you can provide your own freeze and thaw
251functions, by specifying a string with perl source code. It's supposed to 399functions, by specifying a string with perl source code. It's supposed to
252return two code references when evaluated: the first receives a list of 400return two code references when evaluated: the first receives a list of
253perl values and must return an octet string. The second receives the octet 401perl values and must return an octet string. The second receives the octet
255 403
256If you need an external module for serialisation, then you can either 404If you need an external module for serialisation, then you can either
257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 405pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
258or C<require> statement into the serialiser string. Or both. 406or C<require> statement into the serialiser string. Or both.
259 407
408Here are some examples - some of them are also available as global
409variables that make them easier to use.
410
411=over 4
412
413=item octet strings - C<$AnyEvent::Fork::RPC::STRING_SERIALISER>
414
415This serialiser concatenates length-prefixes octet strings, and is the
416default.
417
418Implementation:
419
420 (
421 sub { pack "(w/a*)*", @_ },
422 sub { unpack "(w/a*)*", shift }
423 )
424
425=item json - C<$AnyEvent::Fork::RPC::JSON_SERIALISER>
426
427This serialiser creates JSON arrays - you have to make sure the L<JSON>
428module is installed for this serialiser to work. It can be beneficial for
429sharing when you preload the L<JSON> module in a template process.
430
431L<JSON> (with L<JSON::XS> installed) is slower than the octet string
432serialiser, but usually much faster than L<Storable>, unless big chunks of
433binary data need to be transferred.
434
435Implementation:
436
437 use JSON ();
438 (
439 sub { JSON::encode_json \@_ },
440 sub { @{ JSON::decode_json shift } }
441 )
442
443=item storable - C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER>
444
445This serialiser uses L<Storable>, which means it has high chance of
446serialising just about anything you throw at it, at the cost of having
447very high overhead per operation. It also comes with perl.
448
449Implementation:
450
451 use Storable ();
452 (
453 sub { Storable::freeze \@_ },
454 sub { @{ Storable::thaw shift } }
455 )
456
260=back 457=back
261 458
459=back
460
262See the examples section earlier in this document for some actual examples. 461See the examples section earlier in this document for some actual
462examples.
263 463
264=cut 464=cut
265 465
266our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 466our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
467our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
468our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
267 469
268sub run { 470sub run {
269 my ($self, $function, %arg) = @_; 471 my ($self, $function, %arg) = @_;
270 472
271 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 473 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
281 # default for on_event is to raise an error 483 # default for on_event is to raise an error
282 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 484 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
283 485
284 my ($f, $t) = eval $serialiser; die $@ if $@; 486 my ($f, $t) = eval $serialiser; die $@ if $@;
285 487
286 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); 488 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
287 my ($rlen, $rbuf) = 512 - 16; 489 my ($rlen, $rbuf, $rw) = 512 - 16;
288 490
289 my $wcb = sub { 491 my $wcb = sub {
290 my $len = syswrite $fh, $wbuf; 492 my $len = syswrite $fh, $wbuf;
291 493
292 if (!defined $len) { 494 unless (defined $len) {
293 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 495 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
294 undef $rw; undef $ww; # it ends here 496 undef $rw; undef $ww; # it ends here
295 $on_error->("$!"); 497 $on_error->("$!");
296 } 498 }
297 } 499 }
308 510
309 $self->require ($module) 511 $self->require ($module)
310 ->send_arg ($function, $arg{init}, $serialiser) 512 ->send_arg ($function, $arg{init}, $serialiser)
311 ->run ("$module\::run", sub { 513 ->run ("$module\::run", sub {
312 $fh = shift; 514 $fh = shift;
515
516 my ($id, $len);
313 $rw = AE::io $fh, 0, sub { 517 $rw = AE::io $fh, 0, sub {
314 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 518 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
315 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 519 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
316 520
317 if ($len) { 521 if ($len) {
318 while (4 <= length $rbuf) { 522 while (8 <= length $rbuf) {
319 $len = unpack "L", $rbuf; 523 ($id, $len) = unpack "LL", $rbuf;
320 4 + $len <= length $rbuf 524 8 + $len <= length $rbuf
321 or last; 525 or last;
322 526
323 my @r = $t->(substr $rbuf, 4, $len); 527 my @r = $t->(substr $rbuf, 8, $len);
324 substr $rbuf, 0, $len + 4, ""; 528 substr $rbuf, 0, 8 + $len, "";
529
530 if ($id) {
531 if (@rcb) {
532 (shift @rcb)->(@r);
533 } elsif (my $cb = delete $rcb{$id}) {
534 $cb->(@r);
535 } else {
536 undef $rw; undef $ww;
537 $on_error->("unexpected data from child");
325 538 }
326 if (pop @r) { 539 } else {
327 $on_event->(@r); 540 $on_event->(@r);
328 } elsif (@rcb) {
329 (shift @rcb)->(@r);
330 } else {
331 undef $rw; undef $ww;
332 $on_error->("unexpected data from child");
333 } 541 }
334 } 542 }
335 } elsif (defined $len) { 543 } elsif (defined $len) {
336 undef $rw; undef $ww; # it ends here 544 undef $rw; undef $ww; # it ends here
337 545
338 if (@rcb) { 546 if (@rcb || %rcb) {
339 $on_error->("unexpected eof"); 547 $on_error->("unexpected eof");
340 } else { 548 } else {
341 $on_destroy->(); 549 $on_destroy->();
342 } 550 }
343 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 551 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
352 my $guard = Guard::guard { 560 my $guard = Guard::guard {
353 $shutdown = 1; 561 $shutdown = 1;
354 $ww ||= $fh && AE::io $fh, 1, $wcb; 562 $ww ||= $fh && AE::io $fh, 1, $wcb;
355 }; 563 };
356 564
565 my $id;
566
567 $arg{async}
357 sub { 568 ? sub {
358 push @rcb, pop; 569 $id = ($id == 0xffffffff ? 0 : $id) + 1;
570 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
359 571
572 $rcb{$id} = pop;
573
360 $guard; # keep it alive 574 $guard; # keep it alive
361 575
362 $wbuf .= pack "L/a*", &$f; 576 $wbuf .= pack "LL/a*", $id, &$f;
363 $ww ||= $fh && AE::io $fh, 1, $wcb; 577 $ww ||= $fh && AE::io $fh, 1, $wcb;
364 } 578 }
579 : sub {
580 push @rcb, pop;
581
582 $guard; # keep it alive
583
584 $wbuf .= pack "L/a*", &$f;
585 $ww ||= $fh && AE::io $fh, 1, $wcb;
586 }
365} 587}
366 588
367=item $rpc->(..., $cb->(...)) 589=item $rpc->(..., $cb->(...))
368 590
369The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code 591The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
409See the examples section earlier in this document for some actual 631See the examples section earlier in this document for some actual
410examples. 632examples.
411 633
412=back 634=back
413 635
636=head1 ADVANCED TOPICS
637
638=head2 Choosing a backend
639
640So how do you decide which backend to use? Well, that's your problem to
641solve, but here are some thoughts on the matter:
642
643=over 4
644
645=item Synchronous
646
647The synchronous backend does not rely on any external modules (well,
648except L<common::sense>, which works around a bug in how perl's warning
649system works). This keeps the process very small, for example, on my
650system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
651after C<use warnings; use strict> (for people who grew up with C64s around
652them this is probably shocking every single time they see it). The worker
653process in the first example in this document uses 1792kB.
654
655Since the calls are done synchronously, slow jobs will keep newer jobs
656from executing.
657
658The synchronous backend also has no overhead due to running an event loop
659- reading requests is therefore very efficient, while writing responses is
660less so, as every response results in a write syscall.
661
662If the parent process is busy and a bit slow reading responses, the child
663waits instead of processing further requests. This also limits the amount
664of memory needed for buffering, as never more than one response has to be
665buffered.
666
667The API in the child is simple - you just have to define a function that
668does something and returns something.
669
670It's hard to use modules or code that relies on an event loop, as the
671child cannot execute anything while it waits for more input.
672
673=item Asynchronous
674
675The asynchronous backend relies on L<AnyEvent>, which tries to be small,
676but still comes at a price: On my system, the worker from example 1a uses
6773420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
678which in turn loads a lot of other modules such as L<warnings>, L<strict>,
679L<vars>, L<Exporter>...).
680
681It batches requests and responses reasonably efficiently, doing only as
682few reads and writes as needed, but needs to poll for events via the event
683loop.
684
685Responses are queued when the parent process is busy. This means the child
686can continue to execute any queued requests. It also means that a child
687might queue a lot of responses in memory when it generates them and the
688parent process is slow accepting them.
689
690The API is not a straightforward RPC pattern - you have to call a
691"done" callback to pass return values and signal completion. Also, more
692importantly, the API starts jobs as fast as possible - when 1000 jobs
693are queued and the jobs are slow, they will all run concurrently. The
694child must implement some queueing/limiting mechanism if this causes
695problems. Alternatively, the parent could limit the amount of rpc calls
696that are outstanding.
697
698Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
699easy.
700
701=back
702
703=head2 Passing file descriptors
704
705Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
706descriptor passing abilities.
707
708The reason is that passing file descriptors is extraordinary tricky
709business, and conflicts with efficient batching of messages.
710
711There still is a method you can use: Create a
712C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
713the process before you pass control to C<AnyEvent::Fork::RPC::run>.
714
715Whenever you want to pass a file descriptor, send an rpc request to the
716child process (so it expects the descriptor), then send it over the other
717half of the socketpair. The child should fetch the descriptor from the
718half it has passed earlier.
719
720Here is some (untested) pseudocode to that effect:
721
722 use AnyEvent::Util;
723 use AnyEvent::Fork::RPC;
724 use IO::FDPass;
725
726 my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
727
728 my $rpc = AnyEvent::Fork
729 ->new
730 ->send_fh ($s2)
731 ->require ("MyWorker")
732 ->AnyEvent::Fork::RPC::run ("MyWorker::run"
733 init => "MyWorker::init",
734 );
735
736 undef $s2; # no need to keep it around
737
738 # pass an fd
739 $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
740
741 IO::FDPass fileno $s1, fileno $handle_to_pass;
742
743 $cv->recv;
744
745The MyWorker module could look like this:
746
747 package MyWorker;
748
749 use IO::FDPass;
750
751 my $s2;
752
753 sub init {
754 $s2 = $_[0];
755 }
756
757 sub run {
758 if ($_[0] eq "i'll send some fd now, please expect it!") {
759 my $fd = IO::FDPass::recv fileno $s2;
760 ...
761 }
762 }
763
764Of course, this might be blocking if you pass a lot of file descriptors,
765so you might want to look into L<AnyEvent::FDpasser> which can handle the
766gory details.
767
414=head1 SEE ALSO 768=head1 SEE ALSO
415 769
416L<AnyEvent::Fork> (to create the processes in the first place), 770L<AnyEvent::Fork> (to create the processes in the first place),
417L<AnyEvent::Fork::Pool> (to manage whole pools of processes). 771L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
418 772

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines