ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.5 by root, Wed Apr 17 19:39:54 2013 UTC vs.
Revision 1.10 by root, Wed Apr 17 22:04:49 2013 UTC

2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC; 7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9 9
10 my $rpc = AnyEvent::Fork 10 my $rpc = AnyEvent::Fork
11 ->new 11 ->new
12 ->require ("MyModule") 12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 13 ->AnyEvent::Fork::RPC::run (
34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38 38
39Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41
39=head1 EXAMPLES 42=head1 EXAMPLES
40 43
41=head2 Synchronous Backend 44=head2 Example 1: Synchronous Backend
42 45
43Here is a simple example that implements a backend that executes C<unlink> 46Here is a simple example that implements a backend that executes C<unlink>
44and C<rmdir> calls, and reports their status back. It also reports the 47and C<rmdir> calls, and reports their status back. It also reports the
45number of requests it has processed every three requests, which is clearly 48number of requests it has processed every three requests, which is clearly
46silly, but illustrates the use of events. 49silly, but illustrates the use of events.
103dies with a fatal error - obviously, you must never let this happen :). 106dies with a fatal error - obviously, you must never let this happen :).
104 107
105Eventually it returns the status value true if the command was successful, 108Eventually it returns the status value true if the command was successful,
106or the status value 0 and the stringified error message. 109or the status value 0 and the stringified error message.
107 110
108On my system, running the first cdoe fragment with the given 111On my system, running the first code fragment with the given
109F<MyWorker.pm> in the current directory yields: 112F<MyWorker.pm> in the current directory yields:
110 113
111 /tmp/somepath/1: No such file or directory 114 /tmp/somepath/1: No such file or directory
112 /tmp/somepath/2: No such file or directory 115 /tmp/somepath/2: No such file or directory
113 3 requests handled 116 3 requests handled
134 137
135And as a final remark, there is a fine module on CPAN that can 138And as a final remark, there is a fine module on CPAN that can
136asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently 139asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
137than this example, namely L<IO::AIO>. 140than this example, namely L<IO::AIO>.
138 141
142=head3 Example 1a: the same with the asynchronous backend
143
144This example only shows what needs to be changed to use the async backend
145instead. Doing this is not very useful, the purpose of this example is
146to show the minimum amount of change that is required to go from the
147synchronous to the asynchronous backend.
148
149To use the async backend in the previous example, you need to add the
150C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
151
152 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153 async => 1,
154 ...
155
156And since the function call protocol is now changed, you need to adopt
157C<MyWorker::run> to the async API.
158
159First, you need to accept the extra initial C<$done> callback:
160
161 sub run {
162 my ($done, $cmd, $path) = @_;
163
164And since a response is now generated when C<$done> is called, as opposed
165to when the function returns, we need to call the C<$done> function with
166the status:
167
168 $done->($status or (0, "$!"));
169
170A few remarks are in order. First, it's quite pointless to use the async
171backend for this example - but it I<is> possible. Second, you can call
172C<$done> before or after returning from the function. Third, having both
173returned from the function and having called the C<$done> callback, the
174child process may exit at any time, so you should call C<$done> only when
175you really I<are> done.
176
177=head2 Example 2: Asynchronous Backend
178
179#TODO
180
139=head1 PARENT PROCESS USAGE 181=head1 PARENT PROCESS USAGE
140 182
141This module exports nothing, and only implements a single function: 183This module exports nothing, and only implements a single function:
142 184
143=over 4 185=over 4
150 192
151use Errno (); 193use Errno ();
152use Guard (); 194use Guard ();
153 195
154use AnyEvent; 196use AnyEvent;
155#use AnyEvent::Fork; 197use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
156 198
157our $VERSION = 0.1; 199our $VERSION = 0.1;
158 200
159=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 201=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
160 202
198 240
199Called when the C<$rpc> object has been destroyed and all requests have 241Called when the C<$rpc> object has been destroyed and all requests have
200been successfully handled. This is useful when you queue some requests and 242been successfully handled. This is useful when you queue some requests and
201want the child to go away after it has handled them. The problem is that 243want the child to go away after it has handled them. The problem is that
202the parent must not exit either until all requests have been handled, and 244the parent must not exit either until all requests have been handled, and
203this cna be accomplished by waiting for this callback. 245this can be accomplished by waiting for this callback.
204 246
205=item init => $function (default none) 247=item init => $function (default none)
206 248
207When specified (by name), this function is called in the child as the very 249When specified (by name), this function is called in the child as the very
208first thing when taking over the process, with all the arguments normally 250first thing when taking over the process, with all the arguments normally
231If you want to pre-load the actual back-end modules to enable memory 273If you want to pre-load the actual back-end modules to enable memory
232sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 274sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
233synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 275synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
234 276
235If you use a template process and want to fork both sync and async 277If you use a template process and want to fork both sync and async
236children, then it is permissible to laod both modules. 278children, then it is permissible to load both modules.
237 279
238=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 280=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
239 281
240All arguments, result data and event data have to be serialised to be 282All arguments, result data and event data have to be serialised to be
241transferred between the processes. For this, they have to be frozen and 283transferred between the processes. For this, they have to be frozen and
253If you need an external module for serialisation, then you can either 295If you need an external module for serialisation, then you can either
254pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 296pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
255or C<require> statement into the serialiser string. Or both. 297or C<require> statement into the serialiser string. Or both.
256 298
257=back 299=back
300
301See the examples section earlier in this document for some actual
302examples.
258 303
259=cut 304=cut
260 305
261our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 306our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
262 307
276 # default for on_event is to raise an error 321 # default for on_event is to raise an error
277 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 322 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
278 323
279 my ($f, $t) = eval $serialiser; die $@ if $@; 324 my ($f, $t) = eval $serialiser; die $@ if $@;
280 325
281 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); 326 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
282 my ($rlen, $rbuf) = 512 - 16; 327 my ($rlen, $rbuf, $rw) = 512 - 16;
283 328
284 my $wcb = sub { 329 my $wcb = sub {
285 my $len = syswrite $fh, $wbuf; 330 my $len = syswrite $fh, $wbuf;
286 331
287 if (!defined $len) { 332 unless (defined $len) {
288 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 333 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
289 undef $rw; undef $ww; # it ends here 334 undef $rw; undef $ww; # it ends here
290 $on_error->("$!"); 335 $on_error->("$!");
291 } 336 }
292 } 337 }
303 348
304 $self->require ($module) 349 $self->require ($module)
305 ->send_arg ($function, $arg{init}, $serialiser) 350 ->send_arg ($function, $arg{init}, $serialiser)
306 ->run ("$module\::run", sub { 351 ->run ("$module\::run", sub {
307 $fh = shift; 352 $fh = shift;
353
354 my ($id, $len);
308 $rw = AE::io $fh, 0, sub { 355 $rw = AE::io $fh, 0, sub {
309 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 356 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
310 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 357 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
311 358
312 if ($len) { 359 if ($len) {
313 while (5 <= length $rbuf) { 360 while (8 <= length $rbuf) {
314 $len = unpack "L", $rbuf; 361 ($id, $len) = unpack "LL", $rbuf;
315 4 + $len <= length $rbuf 362 8 + $len <= length $rbuf
316 or last; 363 or last;
317 364
318 my @r = $t->(substr $rbuf, 4, $len); 365 my @r = $t->(substr $rbuf, 8, $len);
319 substr $rbuf, 0, $len + 4, ""; 366 substr $rbuf, 0, 8 + $len, "";
367
368 if ($id) {
369 if (@rcb) {
370 (shift @rcb)->(@r);
371 } elsif (my $cb = delete $rcb{$id}) {
372 $cb->(@r);
373 } else {
374 undef $rw; undef $ww;
375 $on_error->("unexpected data from child");
320 376 }
321 if (pop @r) { 377 } else {
322 $on_event->(@r); 378 $on_event->(@r);
323 } elsif (@rcb) {
324 (shift @rcb)->(@r);
325 } else {
326 undef $rw; undef $ww;
327 $on_error->("unexpected data from child");
328 } 379 }
329 } 380 }
330 } elsif (defined $len) { 381 } elsif (defined $len) {
331 undef $rw; undef $ww; # it ends here 382 undef $rw; undef $ww; # it ends here
332 383
333 if (@rcb) { 384 if (@rcb || %rcb) {
385 use Data::Dump;ddx[\@rcb,\%rcb];#d#
334 $on_error->("unexpected eof"); 386 $on_error->("unexpected eof");
335 } else { 387 } else {
336 $on_destroy->(); 388 $on_destroy->();
337 } 389 }
338 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 390 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
347 my $guard = Guard::guard { 399 my $guard = Guard::guard {
348 $shutdown = 1; 400 $shutdown = 1;
349 $ww ||= $fh && AE::io $fh, 1, $wcb; 401 $ww ||= $fh && AE::io $fh, 1, $wcb;
350 }; 402 };
351 403
404 my $id;
405
406 $arg{async}
352 sub { 407 ? sub {
353 push @rcb, pop; 408 $id = ($id == 0xffffffff ? 0 : $id) + 1;
409 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
354 410
411 $rcb{$id} = pop;
412
355 $guard; # keep it alive 413 $guard; # keep it alive
356 414
357 $wbuf .= pack "L/a*", &$f; 415 $wbuf .= pack "LL/a*", $id, &$f;
358 $ww ||= $fh && AE::io $fh, 1, $wcb; 416 $ww ||= $fh && AE::io $fh, 1, $wcb;
359 } 417 }
418 : sub {
419 push @rcb, pop;
420
421 $guard; # keep it alive
422
423 $wbuf .= pack "L/a*", &$f;
424 $ww ||= $fh && AE::io $fh, 1, $wcb;
425 }
360} 426}
361 427
362=item $rpc->(..., $cb->(...)) 428=item $rpc->(..., $cb->(...))
363 429
364The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code 430The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
379 445
380The other thing that can be done with the RPC object is to destroy it. In 446The other thing that can be done with the RPC object is to destroy it. In
381this case, the child process will execute all remaining RPC calls, report 447this case, the child process will execute all remaining RPC calls, report
382their results, and then exit. 448their results, and then exit.
383 449
450See the examples section earlier in this document for some actual
451examples.
452
384=back 453=back
385 454
386=head1 CHILD PROCESS USAGE 455=head1 CHILD PROCESS USAGE
387 456
388The following function is not available in this module. They are only 457The following function is not available in this module. They are only
396 465
397Send an event to the parent. Events are a bit like RPC calls made by the 466Send an event to the parent. Events are a bit like RPC calls made by the
398child process to the parent, except that there is no notion of return 467child process to the parent, except that there is no notion of return
399values. 468values.
400 469
470See the examples section earlier in this document for some actual
471examples.
472
401=back 473=back
402 474
403=head1 SEE ALSO 475=head1 SEE ALSO
404 476
405L<AnyEvent::Fork> (to create the processes in the first place), 477L<AnyEvent::Fork> (to create the processes in the first place),

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines