ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.10
Committed: Wed Apr 17 22:04:49 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.9: +40 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9
10 my $rpc = AnyEvent::Fork
11 ->new
12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server",
15 );
16
17 my $cv = AE::cv;
18
19 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n";
21 $cv->send;
22 });
23
24 $cv->recv;
25
26 =head1 DESCRIPTION
27
28 This module implements a simple RPC protocol and backend for processes
29 created via L<AnyEvent::Fork>, allowing you to call a function in the
30 child process and receive its return values (up to 4GB serialised).
31
32 It implements two different backends: a synchronous one that works like a
33 normal function call, and an asynchronous one that can run multiple jobs
34 concurrently in the child, using AnyEvent.
35
36 It also implements an asynchronous event mechanism from the child to the
37 parent, that could be used for progress indications or other information.
38
39 Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40 separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41
42 =head1 EXAMPLES
43
44 =head2 Example 1: Synchronous Backend
45
46 Here is a simple example that implements a backend that executes C<unlink>
47 and C<rmdir> calls, and reports their status back. It also reports the
48 number of requests it has processed every three requests, which is clearly
49 silly, but illustrates the use of events.
50
51 First the parent process:
52
53 use AnyEvent;
54 use AnyEvent::Fork;
55 use AnyEvent::Fork::RPC;
56
57 my $done = AE::cv;
58
59 my $rpc = AnyEvent::Fork
60 ->new
61 ->require ("MyWorker")
62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
64 on_event => sub { warn "$_[0] requests handled\n" },
65 on_destroy => $done,
66 );
67
68 for my $id (1..6) {
69 $rpc->(rmdir => "/tmp/somepath/$id", sub {
70 $_[0]
71 or warn "/tmp/somepath/$id: $_[1]\n";
72 });
73 }
74
75 undef $rpc;
76
77 $done->recv;
78
79 The parent creates the process, queues a few rmdir's. It then forgets
80 about the C<$rpc> object, so that the child exits after it has handled the
81 requests, and then it waits till the requests have been handled.
82
83 The child is implemented using a separate module, C<MyWorker>, shown here:
84
85 package MyWorker;
86
87 my $count;
88
89 sub run {
90 my ($cmd, $path) = @_;
91
92 AnyEvent::Fork::RPC::event ($count)
93 unless ++$count % 3;
94
95 my $status = $cmd eq "rmdir" ? rmdir $path
96 : $cmd eq "unlink" ? unlink $path
97 : die "fatal error, illegal command '$cmd'";
98
99 $status or (0, "$!")
100 }
101
102 1
103
104 The C<run> function first sends a "progress" event every three calls, and
105 then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106 dies with a fatal error - obviously, you must never let this happen :).
107
108 Eventually it returns the status value true if the command was successful,
109 or the status value 0 and the stringified error message.
110
111 On my system, running the first code fragment with the given
112 F<MyWorker.pm> in the current directory yields:
113
114 /tmp/somepath/1: No such file or directory
115 /tmp/somepath/2: No such file or directory
116 3 requests handled
117 /tmp/somepath/3: No such file or directory
118 /tmp/somepath/4: No such file or directory
119 /tmp/somepath/5: No such file or directory
120 6 requests handled
121 /tmp/somepath/6: No such file or directory
122
123 Obviously, none of the directories I am trying to delete even exist. Also,
124 the events and responses are processed in exactly the same order as
125 they were created in the child, which is true for both synchronous and
126 asynchronous backends.
127
128 Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129 not optional. That is because the function isn't defined when the code is
130 compiled. You can make sure it is visible by pre-loading the correct
131 backend module in the call to C<require>:
132
133 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134
135 Since the backend module declares the C<event> function, loading it first
136 ensures that perl will correctly interpret calls to it.
137
138 And as a final remark, there is a fine module on CPAN that can
139 asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140 than this example, namely L<IO::AIO>.
141
142 =head3 Example 1a: the same with the asynchronous backend
143
144 This example only shows what needs to be changed to use the async backend
145 instead. Doing this is not very useful, the purpose of this example is
146 to show the minimum amount of change that is required to go from the
147 synchronous to the asynchronous backend.
148
149 To use the async backend in the previous example, you need to add the
150 C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
151
152 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153 async => 1,
154 ...
155
156 And since the function call protocol is now changed, you need to adopt
157 C<MyWorker::run> to the async API.
158
159 First, you need to accept the extra initial C<$done> callback:
160
161 sub run {
162 my ($done, $cmd, $path) = @_;
163
164 And since a response is now generated when C<$done> is called, as opposed
165 to when the function returns, we need to call the C<$done> function with
166 the status:
167
168 $done->($status or (0, "$!"));
169
170 A few remarks are in order. First, it's quite pointless to use the async
171 backend for this example - but it I<is> possible. Second, you can call
172 C<$done> before or after returning from the function. Third, having both
173 returned from the function and having called the C<$done> callback, the
174 child process may exit at any time, so you should call C<$done> only when
175 you really I<are> done.
176
177 =head2 Example 2: Asynchronous Backend
178
179 #TODO
180
181 =head1 PARENT PROCESS USAGE
182
183 This module exports nothing, and only implements a single function:
184
185 =over 4
186
187 =cut
188
189 package AnyEvent::Fork::RPC;
190
191 use common::sense;
192
193 use Errno ();
194 use Guard ();
195
196 use AnyEvent;
197 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
198
199 our $VERSION = 0.1;
200
201 =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
202
203 The traditional way to call it. But it is way cooler to call it in the
204 following way:
205
206 =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
207
208 This C<run> function/method can be used in place of the
209 L<AnyEvent::Fork::run> method. Just like that method, it takes over
210 the L<AnyEvent::Fork> process, but instead of calling the specified
211 C<$function> directly, it runs a server that accepts RPC calls and handles
212 responses.
213
214 It returns a function reference that can be used to call the function in
215 the child process, handling serialisation and data transfers.
216
217 The following key/value pairs are allowed. It is recommended to have at
218 least an C<on_error> or C<on_event> handler set.
219
220 =over 4
221
222 =item on_error => $cb->($msg)
223
224 Called on (fatal) errors, with a descriptive (hopefully) message. If
225 this callback is not provided, but C<on_event> is, then the C<on_event>
226 callback is called with the first argument being the string C<error>,
227 followed by the error message.
228
229 If neither handler is provided it prints the error to STDERR and will
230 start failing badly.
231
232 =item on_event => $cb->(...)
233
234 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
235 child, with the arguments of that function passed to the callback.
236
237 Also called on errors when no C<on_error> handler is provided.
238
239 =item on_destroy => $cb->()
240
241 Called when the C<$rpc> object has been destroyed and all requests have
242 been successfully handled. This is useful when you queue some requests and
243 want the child to go away after it has handled them. The problem is that
244 the parent must not exit either until all requests have been handled, and
245 this can be accomplished by waiting for this callback.
246
247 =item init => $function (default none)
248
249 When specified (by name), this function is called in the child as the very
250 first thing when taking over the process, with all the arguments normally
251 passed to the C<AnyEvent::Fork::run> function, except the communications
252 socket.
253
254 It can be used to do one-time things in the child such as storing passed
255 parameters or opening database connections.
256
257 It is called very early - before the serialisers are created or the
258 C<$function> name is resolved into a function reference, so it could be
259 used to load any modules that provide the serialiser or function. It can
260 not, however, create events.
261
262 =item async => $boolean (default: 0)
263
264 The default server used in the child does all I/O blockingly, and only
265 allows a single RPC call to execute concurrently.
266
267 Setting C<async> to a true value switches to another implementation that
268 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
269
270 The actual API in the child is documented in the section that describes
271 the calling semantics of the returned C<$rpc> function.
272
273 If you want to pre-load the actual back-end modules to enable memory
274 sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
275 synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
276
277 If you use a template process and want to fork both sync and async
278 children, then it is permissible to load both modules.
279
280 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
281
282 All arguments, result data and event data have to be serialised to be
283 transferred between the processes. For this, they have to be frozen and
284 thawed in both parent and child processes.
285
286 By default, only octet strings can be passed between the processes, which
287 is reasonably fast and efficient.
288
289 For more complicated use cases, you can provide your own freeze and thaw
290 functions, by specifying a string with perl source code. It's supposed to
291 return two code references when evaluated: the first receives a list of
292 perl values and must return an octet string. The second receives the octet
293 string and must return the original list of values.
294
295 If you need an external module for serialisation, then you can either
296 pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
297 or C<require> statement into the serialiser string. Or both.
298
299 =back
300
301 See the examples section earlier in this document for some actual
302 examples.
303
304 =cut
305
306 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
307
308 sub run {
309 my ($self, $function, %arg) = @_;
310
311 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
312 my $on_event = delete $arg{on_event};
313 my $on_error = delete $arg{on_error};
314 my $on_destroy = delete $arg{on_destroy};
315
316 # default for on_error is to on_event, if specified
317 $on_error ||= $on_event
318 ? sub { $on_event->(error => shift) }
319 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
320
321 # default for on_event is to raise an error
322 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
323
324 my ($f, $t) = eval $serialiser; die $@ if $@;
325
326 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
327 my ($rlen, $rbuf, $rw) = 512 - 16;
328
329 my $wcb = sub {
330 my $len = syswrite $fh, $wbuf;
331
332 unless (defined $len) {
333 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
334 undef $rw; undef $ww; # it ends here
335 $on_error->("$!");
336 }
337 }
338
339 substr $wbuf, 0, $len, "";
340
341 unless (length $wbuf) {
342 undef $ww;
343 $shutdown and shutdown $fh, 1;
344 }
345 };
346
347 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
348
349 $self->require ($module)
350 ->send_arg ($function, $arg{init}, $serialiser)
351 ->run ("$module\::run", sub {
352 $fh = shift;
353
354 my ($id, $len);
355 $rw = AE::io $fh, 0, sub {
356 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
357 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
358
359 if ($len) {
360 while (8 <= length $rbuf) {
361 ($id, $len) = unpack "LL", $rbuf;
362 8 + $len <= length $rbuf
363 or last;
364
365 my @r = $t->(substr $rbuf, 8, $len);
366 substr $rbuf, 0, 8 + $len, "";
367
368 if ($id) {
369 if (@rcb) {
370 (shift @rcb)->(@r);
371 } elsif (my $cb = delete $rcb{$id}) {
372 $cb->(@r);
373 } else {
374 undef $rw; undef $ww;
375 $on_error->("unexpected data from child");
376 }
377 } else {
378 $on_event->(@r);
379 }
380 }
381 } elsif (defined $len) {
382 undef $rw; undef $ww; # it ends here
383
384 if (@rcb || %rcb) {
385 use Data::Dump;ddx[\@rcb,\%rcb];#d#
386 $on_error->("unexpected eof");
387 } else {
388 $on_destroy->();
389 }
390 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
391 undef $rw; undef $ww; # it ends here
392 $on_error->("read: $!");
393 }
394 };
395
396 $ww ||= AE::io $fh, 1, $wcb;
397 });
398
399 my $guard = Guard::guard {
400 $shutdown = 1;
401 $ww ||= $fh && AE::io $fh, 1, $wcb;
402 };
403
404 my $id;
405
406 $arg{async}
407 ? sub {
408 $id = ($id == 0xffffffff ? 0 : $id) + 1;
409 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
410
411 $rcb{$id} = pop;
412
413 $guard; # keep it alive
414
415 $wbuf .= pack "LL/a*", $id, &$f;
416 $ww ||= $fh && AE::io $fh, 1, $wcb;
417 }
418 : sub {
419 push @rcb, pop;
420
421 $guard; # keep it alive
422
423 $wbuf .= pack "L/a*", &$f;
424 $ww ||= $fh && AE::io $fh, 1, $wcb;
425 }
426 }
427
428 =item $rpc->(..., $cb->(...))
429
430 The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
431 reference. There are two things you can do with it: call it, and let it go
432 out of scope (let it get destroyed).
433
434 If C<async> was false when C<$rpc> was created (the default), then, if you
435 call C<$rpc>, the C<$function> is invoked with all arguments passed to
436 C<$rpc> except the last one (the callback). When the function returns, the
437 callback will be invoked with all the return values.
438
439 If C<async> was true, then the C<$function> receives an additional
440 initial argument, the result callback. In this case, returning from
441 C<$function> does nothing - the function only counts as "done" when the
442 result callback is called, and any arguments passed to it are considered
443 the return values. This makes it possible to "return" from event handlers
444 or e.g. Coro threads.
445
446 The other thing that can be done with the RPC object is to destroy it. In
447 this case, the child process will execute all remaining RPC calls, report
448 their results, and then exit.
449
450 See the examples section earlier in this document for some actual
451 examples.
452
453 =back
454
455 =head1 CHILD PROCESS USAGE
456
457 The following function is not available in this module. They are only
458 available in the namespace of this module when the child is running,
459 without having to load any extra modules. They are part of the child-side
460 API of L<AnyEvent::Fork::RPC>.
461
462 =over 4
463
464 =item AnyEvent::Fork::RPC::event ...
465
466 Send an event to the parent. Events are a bit like RPC calls made by the
467 child process to the parent, except that there is no notion of return
468 values.
469
470 See the examples section earlier in this document for some actual
471 examples.
472
473 =back
474
475 =head1 SEE ALSO
476
477 L<AnyEvent::Fork> (to create the processes in the first place),
478 L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
479
480 =head1 AUTHOR AND CONTACT INFORMATION
481
482 Marc Lehmann <schmorp@schmorp.de>
483 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
484
485 =cut
486
487 1
488