ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.4 by root, Wed Apr 17 19:38:25 2013 UTC vs.
Revision 1.10 by root, Wed Apr 17 22:04:49 2013 UTC

2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC; 7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9 9
10 my $rpc = AnyEvent::Fork 10 my $rpc = AnyEvent::Fork
11 ->new 11 ->new
12 ->require ("MyModule") 12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 13 ->AnyEvent::Fork::RPC::run (
34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38 38
39Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41
39=head1 EXAMPLES 42=head1 EXAMPLES
40 43
41=head2 Synchronous Backend 44=head2 Example 1: Synchronous Backend
42 45
43Here is a simple example that implements a backend that executes C<unlink> 46Here is a simple example that implements a backend that executes C<unlink>
44and C<rmdir> calls, and reports their status back. It also reports the 47and C<rmdir> calls, and reports their status back. It also reports the
45number of requests it has processed every three requests, which is clearly 48number of requests it has processed every three requests, which is clearly
46silly, but illustrates the use of events. 49silly, but illustrates the use of events.
55 58
56 my $rpc = AnyEvent::Fork 59 my $rpc = AnyEvent::Fork
57 ->new 60 ->new
58 ->require ("MyWorker") 61 ->require ("MyWorker")
59 ->AnyEvent::Fork::RPC::run ("MyWorker::run", 62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
60 on_event => sub { warn "$_[0] requests handled\n" }, 64 on_event => sub { warn "$_[0] requests handled\n" },
61 on_destroy => $done, 65 on_destroy => $done,
62 ); 66 );
63 67
64 for my $id (1..6) { 68 for my $id (1..6) {
102dies with a fatal error - obviously, you must never let this happen :). 106dies with a fatal error - obviously, you must never let this happen :).
103 107
104Eventually it returns the status value true if the command was successful, 108Eventually it returns the status value true if the command was successful,
105or the status value 0 and the stringified error message. 109or the status value 0 and the stringified error message.
106 110
107On my system, running the first cdoe fragment with the given 111On my system, running the first code fragment with the given
108F<MyWorker.pm> in the current directory yields: 112F<MyWorker.pm> in the current directory yields:
109 113
110 /tmp/somepath/1: No such file or directory 114 /tmp/somepath/1: No such file or directory
111 /tmp/somepath/2: No such file or directory 115 /tmp/somepath/2: No such file or directory
112 3 requests handled 116 3 requests handled
133 137
134And as a final remark, there is a fine module on CPAN that can 138And as a final remark, there is a fine module on CPAN that can
135asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently 139asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
136than this example, namely L<IO::AIO>. 140than this example, namely L<IO::AIO>.
137 141
142=head3 Example 1a: the same with the asynchronous backend
143
144This example only shows what needs to be changed to use the async backend
145instead. Doing this is not very useful, the purpose of this example is
146to show the minimum amount of change that is required to go from the
147synchronous to the asynchronous backend.
148
149To use the async backend in the previous example, you need to add the
150C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
151
152 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153 async => 1,
154 ...
155
156And since the function call protocol is now changed, you need to adopt
157C<MyWorker::run> to the async API.
158
159First, you need to accept the extra initial C<$done> callback:
160
161 sub run {
162 my ($done, $cmd, $path) = @_;
163
164And since a response is now generated when C<$done> is called, as opposed
165to when the function returns, we need to call the C<$done> function with
166the status:
167
168 $done->($status or (0, "$!"));
169
170A few remarks are in order. First, it's quite pointless to use the async
171backend for this example - but it I<is> possible. Second, you can call
172C<$done> before or after returning from the function. Third, having both
173returned from the function and having called the C<$done> callback, the
174child process may exit at any time, so you should call C<$done> only when
175you really I<are> done.
176
177=head2 Example 2: Asynchronous Backend
178
179#TODO
180
138=head1 PARENT PROCESS USAGE 181=head1 PARENT PROCESS USAGE
139 182
140This module exports nothing, and only implements a single function: 183This module exports nothing, and only implements a single function:
141 184
142=over 4 185=over 4
149 192
150use Errno (); 193use Errno ();
151use Guard (); 194use Guard ();
152 195
153use AnyEvent; 196use AnyEvent;
154#use AnyEvent::Fork; 197use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
155 198
156our $VERSION = 0.1; 199our $VERSION = 0.1;
157 200
158=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 201=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
159 202
197 240
198Called when the C<$rpc> object has been destroyed and all requests have 241Called when the C<$rpc> object has been destroyed and all requests have
199been successfully handled. This is useful when you queue some requests and 242been successfully handled. This is useful when you queue some requests and
200want the child to go away after it has handled them. The problem is that 243want the child to go away after it has handled them. The problem is that
201the parent must not exit either until all requests have been handled, and 244the parent must not exit either until all requests have been handled, and
202this cna be accomplished by waiting for this callback. 245this can be accomplished by waiting for this callback.
203 246
204=item init => $function (default none) 247=item init => $function (default none)
205 248
206When specified (by name), this function is called in the child as the very 249When specified (by name), this function is called in the child as the very
207first thing when taking over the process, with all the arguments normally 250first thing when taking over the process, with all the arguments normally
230If you want to pre-load the actual back-end modules to enable memory 273If you want to pre-load the actual back-end modules to enable memory
231sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 274sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
232synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 275synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
233 276
234If you use a template process and want to fork both sync and async 277If you use a template process and want to fork both sync and async
235children, then it is permissible to laod both modules. 278children, then it is permissible to load both modules.
236 279
237=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 280=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
238 281
239All arguments, result data and event data have to be serialised to be 282All arguments, result data and event data have to be serialised to be
240transferred between the processes. For this, they have to be frozen and 283transferred between the processes. For this, they have to be frozen and
252If you need an external module for serialisation, then you can either 295If you need an external module for serialisation, then you can either
253pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 296pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
254or C<require> statement into the serialiser string. Or both. 297or C<require> statement into the serialiser string. Or both.
255 298
256=back 299=back
300
301See the examples section earlier in this document for some actual
302examples.
257 303
258=cut 304=cut
259 305
260our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 306our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
261 307
275 # default for on_event is to raise an error 321 # default for on_event is to raise an error
276 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 322 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
277 323
278 my ($f, $t) = eval $serialiser; die $@ if $@; 324 my ($f, $t) = eval $serialiser; die $@ if $@;
279 325
280 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); 326 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
281 my ($rlen, $rbuf) = 512 - 16; 327 my ($rlen, $rbuf, $rw) = 512 - 16;
282 328
283 my $wcb = sub { 329 my $wcb = sub {
284 my $len = syswrite $fh, $wbuf; 330 my $len = syswrite $fh, $wbuf;
285 331
286 if (!defined $len) { 332 unless (defined $len) {
287 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 333 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
288 undef $rw; undef $ww; # it ends here 334 undef $rw; undef $ww; # it ends here
289 $on_error->("$!"); 335 $on_error->("$!");
290 } 336 }
291 } 337 }
302 348
303 $self->require ($module) 349 $self->require ($module)
304 ->send_arg ($function, $arg{init}, $serialiser) 350 ->send_arg ($function, $arg{init}, $serialiser)
305 ->run ("$module\::run", sub { 351 ->run ("$module\::run", sub {
306 $fh = shift; 352 $fh = shift;
353
354 my ($id, $len);
307 $rw = AE::io $fh, 0, sub { 355 $rw = AE::io $fh, 0, sub {
308 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 356 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
309 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 357 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
310 358
311 if ($len) { 359 if ($len) {
312 while (5 <= length $rbuf) { 360 while (8 <= length $rbuf) {
313 $len = unpack "L", $rbuf; 361 ($id, $len) = unpack "LL", $rbuf;
314 4 + $len <= length $rbuf 362 8 + $len <= length $rbuf
315 or last; 363 or last;
316 364
317 my @r = $t->(substr $rbuf, 4, $len); 365 my @r = $t->(substr $rbuf, 8, $len);
318 substr $rbuf, 0, $len + 4, ""; 366 substr $rbuf, 0, 8 + $len, "";
367
368 if ($id) {
369 if (@rcb) {
370 (shift @rcb)->(@r);
371 } elsif (my $cb = delete $rcb{$id}) {
372 $cb->(@r);
373 } else {
374 undef $rw; undef $ww;
375 $on_error->("unexpected data from child");
319 376 }
320 if (pop @r) { 377 } else {
321 $on_event->(@r); 378 $on_event->(@r);
322 } elsif (@rcb) {
323 (shift @rcb)->(@r);
324 } else {
325 undef $rw; undef $ww;
326 $on_error->("unexpected data from child");
327 } 379 }
328 } 380 }
329 } elsif (defined $len) { 381 } elsif (defined $len) {
330 undef $rw; undef $ww; # it ends here 382 undef $rw; undef $ww; # it ends here
331 383
332 if (@rcb) { 384 if (@rcb || %rcb) {
385 use Data::Dump;ddx[\@rcb,\%rcb];#d#
333 $on_error->("unexpected eof"); 386 $on_error->("unexpected eof");
334 } else { 387 } else {
335 $on_destroy->(); 388 $on_destroy->();
336 } 389 }
337 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 390 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
346 my $guard = Guard::guard { 399 my $guard = Guard::guard {
347 $shutdown = 1; 400 $shutdown = 1;
348 $ww ||= $fh && AE::io $fh, 1, $wcb; 401 $ww ||= $fh && AE::io $fh, 1, $wcb;
349 }; 402 };
350 403
404 my $id;
405
406 $arg{async}
351 sub { 407 ? sub {
352 push @rcb, pop; 408 $id = ($id == 0xffffffff ? 0 : $id) + 1;
409 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
353 410
411 $rcb{$id} = pop;
412
354 $guard; # keep it alive 413 $guard; # keep it alive
355 414
356 $wbuf .= pack "L/a*", &$f; 415 $wbuf .= pack "LL/a*", $id, &$f;
357 $ww ||= $fh && AE::io $fh, 1, $wcb; 416 $ww ||= $fh && AE::io $fh, 1, $wcb;
358 } 417 }
418 : sub {
419 push @rcb, pop;
420
421 $guard; # keep it alive
422
423 $wbuf .= pack "L/a*", &$f;
424 $ww ||= $fh && AE::io $fh, 1, $wcb;
425 }
359} 426}
360 427
361=item $rpc->(..., $cb->(...)) 428=item $rpc->(..., $cb->(...))
362 429
363The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code 430The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
378 445
379The other thing that can be done with the RPC object is to destroy it. In 446The other thing that can be done with the RPC object is to destroy it. In
380this case, the child process will execute all remaining RPC calls, report 447this case, the child process will execute all remaining RPC calls, report
381their results, and then exit. 448their results, and then exit.
382 449
450See the examples section earlier in this document for some actual
451examples.
452
383=back 453=back
384 454
385=head1 CHILD PROCESS USAGE 455=head1 CHILD PROCESS USAGE
386 456
387The following function is not available in this module. They are only 457The following function is not available in this module. They are only
395 465
396Send an event to the parent. Events are a bit like RPC calls made by the 466Send an event to the parent. Events are a bit like RPC calls made by the
397child process to the parent, except that there is no notion of return 467child process to the parent, except that there is no notion of return
398values. 468values.
399 469
470See the examples section earlier in this document for some actual
471examples.
472
400=back 473=back
401 474
402=head1 SEE ALSO 475=head1 SEE ALSO
403 476
404L<AnyEvent::Fork> (to create the processes in the first place), 477L<AnyEvent::Fork> (to create the processes in the first place),

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines