ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.5 by root, Wed Apr 17 19:39:54 2013 UTC vs.
Revision 1.9 by root, Wed Apr 17 21:48:35 2013 UTC

2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC; 7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9 9
10 my $rpc = AnyEvent::Fork 10 my $rpc = AnyEvent::Fork
11 ->new 11 ->new
12 ->require ("MyModule") 12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 13 ->AnyEvent::Fork::RPC::run (
33normal function call, and an asynchronous one that can run multiple jobs 33normal function call, and an asynchronous one that can run multiple jobs
34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38
39Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40separate C<use AnyEvent::Fork> if you wish, but you don't have to.
38 41
39=head1 EXAMPLES 42=head1 EXAMPLES
40 43
41=head2 Synchronous Backend 44=head2 Synchronous Backend
42 45
103dies with a fatal error - obviously, you must never let this happen :). 106dies with a fatal error - obviously, you must never let this happen :).
104 107
105Eventually it returns the status value true if the command was successful, 108Eventually it returns the status value true if the command was successful,
106or the status value 0 and the stringified error message. 109or the status value 0 and the stringified error message.
107 110
108On my system, running the first cdoe fragment with the given 111On my system, running the first code fragment with the given
109F<MyWorker.pm> in the current directory yields: 112F<MyWorker.pm> in the current directory yields:
110 113
111 /tmp/somepath/1: No such file or directory 114 /tmp/somepath/1: No such file or directory
112 /tmp/somepath/2: No such file or directory 115 /tmp/somepath/2: No such file or directory
113 3 requests handled 116 3 requests handled
150 153
151use Errno (); 154use Errno ();
152use Guard (); 155use Guard ();
153 156
154use AnyEvent; 157use AnyEvent;
155#use AnyEvent::Fork; 158use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
156 159
157our $VERSION = 0.1; 160our $VERSION = 0.1;
158 161
159=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 162=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
160 163
198 201
199Called when the C<$rpc> object has been destroyed and all requests have 202Called when the C<$rpc> object has been destroyed and all requests have
200been successfully handled. This is useful when you queue some requests and 203been successfully handled. This is useful when you queue some requests and
201want the child to go away after it has handled them. The problem is that 204want the child to go away after it has handled them. The problem is that
202the parent must not exit either until all requests have been handled, and 205the parent must not exit either until all requests have been handled, and
203this cna be accomplished by waiting for this callback. 206this can be accomplished by waiting for this callback.
204 207
205=item init => $function (default none) 208=item init => $function (default none)
206 209
207When specified (by name), this function is called in the child as the very 210When specified (by name), this function is called in the child as the very
208first thing when taking over the process, with all the arguments normally 211first thing when taking over the process, with all the arguments normally
231If you want to pre-load the actual back-end modules to enable memory 234If you want to pre-load the actual back-end modules to enable memory
232sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 235sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
233synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 236synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
234 237
235If you use a template process and want to fork both sync and async 238If you use a template process and want to fork both sync and async
236children, then it is permissible to laod both modules. 239children, then it is permissible to load both modules.
237 240
238=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 241=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
239 242
240All arguments, result data and event data have to be serialised to be 243All arguments, result data and event data have to be serialised to be
241transferred between the processes. For this, they have to be frozen and 244transferred between the processes. For this, they have to be frozen and
253If you need an external module for serialisation, then you can either 256If you need an external module for serialisation, then you can either
254pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
255or C<require> statement into the serialiser string. Or both. 258or C<require> statement into the serialiser string. Or both.
256 259
257=back 260=back
261
262See the examples section earlier in this document for some actual
263examples.
258 264
259=cut 265=cut
260 266
261our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 267our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
262 268
276 # default for on_event is to raise an error 282 # default for on_event is to raise an error
277 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 283 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
278 284
279 my ($f, $t) = eval $serialiser; die $@ if $@; 285 my ($f, $t) = eval $serialiser; die $@ if $@;
280 286
281 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw); 287 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
282 my ($rlen, $rbuf) = 512 - 16; 288 my ($rlen, $rbuf, $rw) = 512 - 16;
283 289
284 my $wcb = sub { 290 my $wcb = sub {
285 my $len = syswrite $fh, $wbuf; 291 my $len = syswrite $fh, $wbuf;
286 292
287 if (!defined $len) { 293 unless (defined $len) {
288 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 294 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
289 undef $rw; undef $ww; # it ends here 295 undef $rw; undef $ww; # it ends here
290 $on_error->("$!"); 296 $on_error->("$!");
291 } 297 }
292 } 298 }
303 309
304 $self->require ($module) 310 $self->require ($module)
305 ->send_arg ($function, $arg{init}, $serialiser) 311 ->send_arg ($function, $arg{init}, $serialiser)
306 ->run ("$module\::run", sub { 312 ->run ("$module\::run", sub {
307 $fh = shift; 313 $fh = shift;
314
315 my ($id, $len);
308 $rw = AE::io $fh, 0, sub { 316 $rw = AE::io $fh, 0, sub {
309 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf; 317 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
310 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf; 318 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
311 319
312 if ($len) { 320 if ($len) {
313 while (5 <= length $rbuf) { 321 while (8 <= length $rbuf) {
314 $len = unpack "L", $rbuf; 322 ($id, $len) = unpack "LL", $rbuf;
315 4 + $len <= length $rbuf 323 8 + $len <= length $rbuf
316 or last; 324 or last;
317 325
318 my @r = $t->(substr $rbuf, 4, $len); 326 my @r = $t->(substr $rbuf, 8, $len);
319 substr $rbuf, 0, $len + 4, ""; 327 substr $rbuf, 0, 8 + $len, "";
328
329 if ($id) {
330 if (@rcb) {
331 (shift @rcb)->(@r);
332 } elsif (my $cb = delete $rcb{$id}) {
333 $cb->(@r);
334 } else {
335 undef $rw; undef $ww;
336 $on_error->("unexpected data from child");
320 337 }
321 if (pop @r) { 338 } else {
322 $on_event->(@r); 339 $on_event->(@r);
323 } elsif (@rcb) {
324 (shift @rcb)->(@r);
325 } else {
326 undef $rw; undef $ww;
327 $on_error->("unexpected data from child");
328 } 340 }
329 } 341 }
330 } elsif (defined $len) { 342 } elsif (defined $len) {
331 undef $rw; undef $ww; # it ends here 343 undef $rw; undef $ww; # it ends here
332 344
333 if (@rcb) { 345 if (@rcb || %rcb) {
346 use Data::Dump;ddx[\@rcb,\%rcb];#d#
334 $on_error->("unexpected eof"); 347 $on_error->("unexpected eof");
335 } else { 348 } else {
336 $on_destroy->(); 349 $on_destroy->();
337 } 350 }
338 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 351 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
347 my $guard = Guard::guard { 360 my $guard = Guard::guard {
348 $shutdown = 1; 361 $shutdown = 1;
349 $ww ||= $fh && AE::io $fh, 1, $wcb; 362 $ww ||= $fh && AE::io $fh, 1, $wcb;
350 }; 363 };
351 364
365 my $id;
366
367 $arg{async}
352 sub { 368 ? sub {
353 push @rcb, pop; 369 $id = ($id == 0xffffffff ? 0 : $id) + 1;
370 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
354 371
372 $rcb{$id} = pop;
373
355 $guard; # keep it alive 374 $guard; # keep it alive
356 375
357 $wbuf .= pack "L/a*", &$f; 376 $wbuf .= pack "LL/a*", $id, &$f;
358 $ww ||= $fh && AE::io $fh, 1, $wcb; 377 $ww ||= $fh && AE::io $fh, 1, $wcb;
359 } 378 }
379 : sub {
380 push @rcb, pop;
381
382 $guard; # keep it alive
383
384 $wbuf .= pack "L/a*", &$f;
385 $ww ||= $fh && AE::io $fh, 1, $wcb;
386 }
360} 387}
361 388
362=item $rpc->(..., $cb->(...)) 389=item $rpc->(..., $cb->(...))
363 390
364The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code 391The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
379 406
380The other thing that can be done with the RPC object is to destroy it. In 407The other thing that can be done with the RPC object is to destroy it. In
381this case, the child process will execute all remaining RPC calls, report 408this case, the child process will execute all remaining RPC calls, report
382their results, and then exit. 409their results, and then exit.
383 410
411See the examples section earlier in this document for some actual
412examples.
413
384=back 414=back
385 415
386=head1 CHILD PROCESS USAGE 416=head1 CHILD PROCESS USAGE
387 417
388The following function is not available in this module. They are only 418The following function is not available in this module. They are only
396 426
397Send an event to the parent. Events are a bit like RPC calls made by the 427Send an event to the parent. Events are a bit like RPC calls made by the
398child process to the parent, except that there is no notion of return 428child process to the parent, except that there is no notion of return
399values. 429values.
400 430
431See the examples section earlier in this document for some actual
432examples.
433
401=back 434=back
402 435
403=head1 SEE ALSO 436=head1 SEE ALSO
404 437
405L<AnyEvent::Fork> (to create the processes in the first place), 438L<AnyEvent::Fork> (to create the processes in the first place),

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines