ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.2 by root, Wed Apr 17 17:08:16 2013 UTC vs.
Revision 1.10 by root, Wed Apr 17 22:04:49 2013 UTC

2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC; 7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9 9
10 my $rpc = AnyEvent::Fork 10 my $rpc = AnyEvent::Fork
11 ->new 11 ->new
12 ->require ("MyModule") 12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 13 ->AnyEvent::Fork::RPC::run (
34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38 38
39Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41
42=head1 EXAMPLES
43
44=head2 Example 1: Synchronous Backend
45
46Here is a simple example that implements a backend that executes C<unlink>
47and C<rmdir> calls, and reports their status back. It also reports the
48number of requests it has processed every three requests, which is clearly
49silly, but illustrates the use of events.
50
51First the parent process:
52
53 use AnyEvent;
54 use AnyEvent::Fork;
55 use AnyEvent::Fork::RPC;
56
57 my $done = AE::cv;
58
59 my $rpc = AnyEvent::Fork
60 ->new
61 ->require ("MyWorker")
62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
64 on_event => sub { warn "$_[0] requests handled\n" },
65 on_destroy => $done,
66 );
67
68 for my $id (1..6) {
69 $rpc->(rmdir => "/tmp/somepath/$id", sub {
70 $_[0]
71 or warn "/tmp/somepath/$id: $_[1]\n";
72 });
73 }
74
75 undef $rpc;
76
77 $done->recv;
78
79The parent creates the process, queues a few rmdir's. It then forgets
80about the C<$rpc> object, so that the child exits after it has handled the
81requests, and then it waits till the requests have been handled.
82
83The child is implemented using a separate module, C<MyWorker>, shown here:
84
85 package MyWorker;
86
87 my $count;
88
89 sub run {
90 my ($cmd, $path) = @_;
91
92 AnyEvent::Fork::RPC::event ($count)
93 unless ++$count % 3;
94
95 my $status = $cmd eq "rmdir" ? rmdir $path
96 : $cmd eq "unlink" ? unlink $path
97 : die "fatal error, illegal command '$cmd'";
98
99 $status or (0, "$!")
100 }
101
102 1
103
104The C<run> function first sends a "progress" event every three calls, and
105then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106dies with a fatal error - obviously, you must never let this happen :).
107
108Eventually it returns the status value true if the command was successful,
109or the status value 0 and the stringified error message.
110
111On my system, running the first code fragment with the given
112F<MyWorker.pm> in the current directory yields:
113
114 /tmp/somepath/1: No such file or directory
115 /tmp/somepath/2: No such file or directory
116 3 requests handled
117 /tmp/somepath/3: No such file or directory
118 /tmp/somepath/4: No such file or directory
119 /tmp/somepath/5: No such file or directory
120 6 requests handled
121 /tmp/somepath/6: No such file or directory
122
123Obviously, none of the directories I am trying to delete even exist. Also,
124the events and responses are processed in exactly the same order as
125they were created in the child, which is true for both synchronous and
126asynchronous backends.
127
128Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129not optional. That is because the function isn't defined when the code is
130compiled. You can make sure it is visible by pre-loading the correct
131backend module in the call to C<require>:
132
133 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134
135Since the backend module declares the C<event> function, loading it first
136ensures that perl will correctly interpret calls to it.
137
138And as a final remark, there is a fine module on CPAN that can
139asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140than this example, namely L<IO::AIO>.
141
142=head3 Example 1a: the same with the asynchronous backend
143
144This example only shows what needs to be changed to use the async backend
145instead. Doing this is not very useful, the purpose of this example is
146to show the minimum amount of change that is required to go from the
147synchronous to the asynchronous backend.
148
149To use the async backend in the previous example, you need to add the
150C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
151
152 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153 async => 1,
154 ...
155
156And since the function call protocol is now changed, you need to adopt
157C<MyWorker::run> to the async API.
158
159First, you need to accept the extra initial C<$done> callback:
160
161 sub run {
162 my ($done, $cmd, $path) = @_;
163
164And since a response is now generated when C<$done> is called, as opposed
165to when the function returns, we need to call the C<$done> function with
166the status:
167
168 $done->($status or (0, "$!"));
169
170A few remarks are in order. First, it's quite pointless to use the async
171backend for this example - but it I<is> possible. Second, you can call
172C<$done> before or after returning from the function. Third, having both
173returned from the function and having called the C<$done> callback, the
174child process may exit at any time, so you should call C<$done> only when
175you really I<are> done.
176
177=head2 Example 2: Asynchronous Backend
178
179#TODO
180
39=head1 PARENT PROCESS USAGE 181=head1 PARENT PROCESS USAGE
40 182
41This module exports nothing, and only implements a single function: 183This module exports nothing, and only implements a single function:
42 184
43=over 4 185=over 4
50 192
51use Errno (); 193use Errno ();
52use Guard (); 194use Guard ();
53 195
54use AnyEvent; 196use AnyEvent;
55#use AnyEvent::Fork; 197use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
56 198
57our $VERSION = 0.1; 199our $VERSION = 0.1;
58 200
59=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 201=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60 202
92Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 234Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93child, with the arguments of that function passed to the callback. 235child, with the arguments of that function passed to the callback.
94 236
95Also called on errors when no C<on_error> handler is provided. 237Also called on errors when no C<on_error> handler is provided.
96 238
239=item on_destroy => $cb->()
240
241Called when the C<$rpc> object has been destroyed and all requests have
242been successfully handled. This is useful when you queue some requests and
243want the child to go away after it has handled them. The problem is that
244the parent must not exit either until all requests have been handled, and
245this can be accomplished by waiting for this callback.
246
97=item init => $function (default none) 247=item init => $function (default none)
98 248
99When specified (by name), this function is called in the child as the very 249When specified (by name), this function is called in the child as the very
100first thing when taking over the process, with all the arguments normally 250first thing when taking over the process, with all the arguments normally
101passed to the C<AnyEvent::Fork::run> function, except the communications 251passed to the C<AnyEvent::Fork::run> function, except the communications
102socket. 252socket.
103 253
104It can be used to do one-time things in the child such as storing passed 254It can be used to do one-time things in the child such as storing passed
105parameters or opening database connections. 255parameters or opening database connections.
106 256
257It is called very early - before the serialisers are created or the
258C<$function> name is resolved into a function reference, so it could be
259used to load any modules that provide the serialiser or function. It can
260not, however, create events.
261
107=item async => $boolean (default: 0) 262=item async => $boolean (default: 0)
108 263
109The default server used in the child does all I/O blockingly, and only 264The default server used in the child does all I/O blockingly, and only
110allows a single RPC call to execute concurrently. 265allows a single RPC call to execute concurrently.
111 266
116the calling semantics of the returned C<$rpc> function. 271the calling semantics of the returned C<$rpc> function.
117 272
118If you want to pre-load the actual back-end modules to enable memory 273If you want to pre-load the actual back-end modules to enable memory
119sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 274sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
120synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 275synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
276
277If you use a template process and want to fork both sync and async
278children, then it is permissible to load both modules.
121 279
122=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 280=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
123 281
124All arguments, result data and event data have to be serialised to be 282All arguments, result data and event data have to be serialised to be
125transferred between the processes. For this, they have to be frozen and 283transferred between the processes. For this, they have to be frozen and
138pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 296pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
139or C<require> statement into the serialiser string. Or both. 297or C<require> statement into the serialiser string. Or both.
140 298
141=back 299=back
142 300
301See the examples section earlier in this document for some actual
302examples.
303
143=cut 304=cut
144 305
145our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 306our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
146
147# ideally, we want (SvLEN - SvCUR) || 1024 or somesuch...
148sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 }
149 307
150sub run { 308sub run {
151 my ($self, $function, %arg) = @_; 309 my ($self, $function, %arg) = @_;
152 310
153 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 311 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
154 my $on_event = delete $arg{on_event}; 312 my $on_event = delete $arg{on_event};
155 my $on_error = delete $arg{on_error}; 313 my $on_error = delete $arg{on_error};
314 my $on_destroy = delete $arg{on_destroy};
156 315
157 # default for on_error is to on_event, if specified 316 # default for on_error is to on_event, if specified
158 $on_error ||= $on_event 317 $on_error ||= $on_event
159 ? sub { $on_event->(error => shift) } 318 ? sub { $on_event->(error => shift) }
160 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 319 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
162 # default for on_event is to raise an error 321 # default for on_event is to raise an error
163 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 322 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
164 323
165 my ($f, $t) = eval $serialiser; die $@ if $@; 324 my ($f, $t) = eval $serialiser; die $@ if $@;
166 325
167 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); 326 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
327 my ($rlen, $rbuf, $rw) = 512 - 16;
168 328
169 my $wcb = sub { 329 my $wcb = sub {
170 my $len = syswrite $fh, $wbuf; 330 my $len = syswrite $fh, $wbuf;
171 331
172 if (!defined $len) { 332 unless (defined $len) {
173 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 333 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
174 undef $rw; undef $ww; # it ends here 334 undef $rw; undef $ww; # it ends here
175 $on_error->("$!"); 335 $on_error->("$!");
176 } 336 }
177 } 337 }
188 348
189 $self->require ($module) 349 $self->require ($module)
190 ->send_arg ($function, $arg{init}, $serialiser) 350 ->send_arg ($function, $arg{init}, $serialiser)
191 ->run ("$module\::run", sub { 351 ->run ("$module\::run", sub {
192 $fh = shift; 352 $fh = shift;
353
354 my ($id, $len);
193 $rw = AE::io $fh, 0, sub { 355 $rw = AE::io $fh, 0, sub {
356 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
194 my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf; 357 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
195 358
196 if ($len) { 359 if ($len) {
197 while (5 <= length $rbuf) { 360 while (8 <= length $rbuf) {
198 $len = unpack "L", $rbuf; 361 ($id, $len) = unpack "LL", $rbuf;
199 4 + $len <= length $rbuf 362 8 + $len <= length $rbuf
200 or last; 363 or last;
201 364
202 my @r = $t->(substr $rbuf, 4, $len); 365 my @r = $t->(substr $rbuf, 8, $len);
203 substr $rbuf, 0, $len + 4, ""; 366 substr $rbuf, 0, 8 + $len, "";
367
368 if ($id) {
369 if (@rcb) {
370 (shift @rcb)->(@r);
371 } elsif (my $cb = delete $rcb{$id}) {
372 $cb->(@r);
373 } else {
374 undef $rw; undef $ww;
375 $on_error->("unexpected data from child");
204 376 }
205 if (pop @r) { 377 } else {
206 $on_event->(@r); 378 $on_event->(@r);
207 } elsif (@rcb) {
208 (shift @rcb)->(@r);
209 } else {
210 undef $rw; undef $ww;
211 $on_error->("unexpected data from child");
212 } 379 }
213 } 380 }
214 } elsif (defined $len) { 381 } elsif (defined $len) {
215 undef $rw; undef $ww; # it ends here 382 undef $rw; undef $ww; # it ends here
383
384 if (@rcb || %rcb) {
385 use Data::Dump;ddx[\@rcb,\%rcb];#d#
216 $on_error->("unexpected eof") 386 $on_error->("unexpected eof");
217 if @rcb; 387 } else {
388 $on_destroy->();
389 }
218 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 390 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
219 undef $rw; undef $ww; # it ends here 391 undef $rw; undef $ww; # it ends here
220 $on_error->("read: $!"); 392 $on_error->("read: $!");
221 } 393 }
222 }; 394 };
227 my $guard = Guard::guard { 399 my $guard = Guard::guard {
228 $shutdown = 1; 400 $shutdown = 1;
229 $ww ||= $fh && AE::io $fh, 1, $wcb; 401 $ww ||= $fh && AE::io $fh, 1, $wcb;
230 }; 402 };
231 403
404 my $id;
405
406 $arg{async}
232 sub { 407 ? sub {
233 push @rcb, pop; 408 $id = ($id == 0xffffffff ? 0 : $id) + 1;
409 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
234 410
411 $rcb{$id} = pop;
412
235 $guard; # keep it alive 413 $guard; # keep it alive
236 414
237 $wbuf .= pack "L/a*", &$f; 415 $wbuf .= pack "LL/a*", $id, &$f;
238 $ww ||= $fh && AE::io $fh, 1, $wcb; 416 $ww ||= $fh && AE::io $fh, 1, $wcb;
239 } 417 }
418 : sub {
419 push @rcb, pop;
420
421 $guard; # keep it alive
422
423 $wbuf .= pack "L/a*", &$f;
424 $ww ||= $fh && AE::io $fh, 1, $wcb;
425 }
240} 426}
241 427
428=item $rpc->(..., $cb->(...))
429
430The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
431reference. There are two things you can do with it: call it, and let it go
432out of scope (let it get destroyed).
433
434If C<async> was false when C<$rpc> was created (the default), then, if you
435call C<$rpc>, the C<$function> is invoked with all arguments passed to
436C<$rpc> except the last one (the callback). When the function returns, the
437callback will be invoked with all the return values.
438
439If C<async> was true, then the C<$function> receives an additional
440initial argument, the result callback. In this case, returning from
441C<$function> does nothing - the function only counts as "done" when the
442result callback is called, and any arguments passed to it are considered
443the return values. This makes it possible to "return" from event handlers
444or e.g. Coro threads.
445
446The other thing that can be done with the RPC object is to destroy it. In
447this case, the child process will execute all remaining RPC calls, report
448their results, and then exit.
449
450See the examples section earlier in this document for some actual
451examples.
452
242=back 453=back
243 454
244=head1 CHILD PROCESS USAGE 455=head1 CHILD PROCESS USAGE
245 456
246These functions are not available in this module. They are only available 457The following function is not available in this module. They are only
247in the namespace of this module when the child is running, without 458available in the namespace of this module when the child is running,
248having to load any extra module. They are part of the child-side API of 459without having to load any extra modules. They are part of the child-side
249L<AnyEvent::Fork::RPC>. 460API of L<AnyEvent::Fork::RPC>.
250 461
251=over 4 462=over 4
252
253=item AnyEvent::Fork::RPC::quit
254
255This function can be called to gracefully stop the child process when it
256is idle.
257
258After this function is called, the process stops handling incoming RPC
259requests, but outstanding events and function return values will be sent
260to the parent. When all data has been sent, the process calls C<exit>.
261
262Since the parent might not expect the child to exit at random points in
263time, it is often better to signal the parent by sending an C<event> and
264letting the parent close down the child process.
265 463
266=item AnyEvent::Fork::RPC::event ... 464=item AnyEvent::Fork::RPC::event ...
267 465
268Send an event to the parent. Events are a bit like RPC calls made by the 466Send an event to the parent. Events are a bit like RPC calls made by the
269child process to the parent, except that there is no notion of return 467child process to the parent, except that there is no notion of return
270values. 468values.
271 469
470See the examples section earlier in this document for some actual
471examples.
472
272=back 473=back
273 474
274=head1 SEE ALSO 475=head1 SEE ALSO
275 476
276L<AnyEvent::Fork> (to create the processes in the first place), 477L<AnyEvent::Fork> (to create the processes in the first place),

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines