ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.9
Committed: Wed Apr 17 21:48:35 2013 UTC (11 years, 2 months ago) by root
Branch: MAIN
Changes since 1.8: +46 -24 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9
10 my $rpc = AnyEvent::Fork
11 ->new
12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server",
15 );
16
17 my $cv = AE::cv;
18
19 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n";
21 $cv->send;
22 });
23
24 $cv->recv;
25
26 =head1 DESCRIPTION
27
28 This module implements a simple RPC protocol and backend for processes
29 created via L<AnyEvent::Fork>, allowing you to call a function in the
30 child process and receive its return values (up to 4GB serialised).
31
32 It implements two different backends: a synchronous one that works like a
33 normal function call, and an asynchronous one that can run multiple jobs
34 concurrently in the child, using AnyEvent.
35
36 It also implements an asynchronous event mechanism from the child to the
37 parent, that could be used for progress indications or other information.
38
39 Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40 separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41
42 =head1 EXAMPLES
43
44 =head2 Synchronous Backend
45
46 Here is a simple example that implements a backend that executes C<unlink>
47 and C<rmdir> calls, and reports their status back. It also reports the
48 number of requests it has processed every three requests, which is clearly
49 silly, but illustrates the use of events.
50
51 First the parent process:
52
53 use AnyEvent;
54 use AnyEvent::Fork;
55 use AnyEvent::Fork::RPC;
56
57 my $done = AE::cv;
58
59 my $rpc = AnyEvent::Fork
60 ->new
61 ->require ("MyWorker")
62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
64 on_event => sub { warn "$_[0] requests handled\n" },
65 on_destroy => $done,
66 );
67
68 for my $id (1..6) {
69 $rpc->(rmdir => "/tmp/somepath/$id", sub {
70 $_[0]
71 or warn "/tmp/somepath/$id: $_[1]\n";
72 });
73 }
74
75 undef $rpc;
76
77 $done->recv;
78
79 The parent creates the process, queues a few rmdir's. It then forgets
80 about the C<$rpc> object, so that the child exits after it has handled the
81 requests, and then it waits till the requests have been handled.
82
83 The child is implemented using a separate module, C<MyWorker>, shown here:
84
85 package MyWorker;
86
87 my $count;
88
89 sub run {
90 my ($cmd, $path) = @_;
91
92 AnyEvent::Fork::RPC::event ($count)
93 unless ++$count % 3;
94
95 my $status = $cmd eq "rmdir" ? rmdir $path
96 : $cmd eq "unlink" ? unlink $path
97 : die "fatal error, illegal command '$cmd'";
98
99 $status or (0, "$!")
100 }
101
102 1
103
104 The C<run> function first sends a "progress" event every three calls, and
105 then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106 dies with a fatal error - obviously, you must never let this happen :).
107
108 Eventually it returns the status value true if the command was successful,
109 or the status value 0 and the stringified error message.
110
111 On my system, running the first code fragment with the given
112 F<MyWorker.pm> in the current directory yields:
113
114 /tmp/somepath/1: No such file or directory
115 /tmp/somepath/2: No such file or directory
116 3 requests handled
117 /tmp/somepath/3: No such file or directory
118 /tmp/somepath/4: No such file or directory
119 /tmp/somepath/5: No such file or directory
120 6 requests handled
121 /tmp/somepath/6: No such file or directory
122
123 Obviously, none of the directories I am trying to delete even exist. Also,
124 the events and responses are processed in exactly the same order as
125 they were created in the child, which is true for both synchronous and
126 asynchronous backends.
127
128 Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129 not optional. That is because the function isn't defined when the code is
130 compiled. You can make sure it is visible by pre-loading the correct
131 backend module in the call to C<require>:
132
133 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134
135 Since the backend module declares the C<event> function, loading it first
136 ensures that perl will correctly interpret calls to it.
137
138 And as a final remark, there is a fine module on CPAN that can
139 asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140 than this example, namely L<IO::AIO>.
141
142 =head1 PARENT PROCESS USAGE
143
144 This module exports nothing, and only implements a single function:
145
146 =over 4
147
148 =cut
149
150 package AnyEvent::Fork::RPC;
151
152 use common::sense;
153
154 use Errno ();
155 use Guard ();
156
157 use AnyEvent;
158 use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
159
160 our $VERSION = 0.1;
161
162 =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
163
164 The traditional way to call it. But it is way cooler to call it in the
165 following way:
166
167 =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
168
169 This C<run> function/method can be used in place of the
170 L<AnyEvent::Fork::run> method. Just like that method, it takes over
171 the L<AnyEvent::Fork> process, but instead of calling the specified
172 C<$function> directly, it runs a server that accepts RPC calls and handles
173 responses.
174
175 It returns a function reference that can be used to call the function in
176 the child process, handling serialisation and data transfers.
177
178 The following key/value pairs are allowed. It is recommended to have at
179 least an C<on_error> or C<on_event> handler set.
180
181 =over 4
182
183 =item on_error => $cb->($msg)
184
185 Called on (fatal) errors, with a descriptive (hopefully) message. If
186 this callback is not provided, but C<on_event> is, then the C<on_event>
187 callback is called with the first argument being the string C<error>,
188 followed by the error message.
189
190 If neither handler is provided it prints the error to STDERR and will
191 start failing badly.
192
193 =item on_event => $cb->(...)
194
195 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
196 child, with the arguments of that function passed to the callback.
197
198 Also called on errors when no C<on_error> handler is provided.
199
200 =item on_destroy => $cb->()
201
202 Called when the C<$rpc> object has been destroyed and all requests have
203 been successfully handled. This is useful when you queue some requests and
204 want the child to go away after it has handled them. The problem is that
205 the parent must not exit either until all requests have been handled, and
206 this can be accomplished by waiting for this callback.
207
208 =item init => $function (default none)
209
210 When specified (by name), this function is called in the child as the very
211 first thing when taking over the process, with all the arguments normally
212 passed to the C<AnyEvent::Fork::run> function, except the communications
213 socket.
214
215 It can be used to do one-time things in the child such as storing passed
216 parameters or opening database connections.
217
218 It is called very early - before the serialisers are created or the
219 C<$function> name is resolved into a function reference, so it could be
220 used to load any modules that provide the serialiser or function. It can
221 not, however, create events.
222
223 =item async => $boolean (default: 0)
224
225 The default server used in the child does all I/O blockingly, and only
226 allows a single RPC call to execute concurrently.
227
228 Setting C<async> to a true value switches to another implementation that
229 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
230
231 The actual API in the child is documented in the section that describes
232 the calling semantics of the returned C<$rpc> function.
233
234 If you want to pre-load the actual back-end modules to enable memory
235 sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
236 synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
237
238 If you use a template process and want to fork both sync and async
239 children, then it is permissible to load both modules.
240
241 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
242
243 All arguments, result data and event data have to be serialised to be
244 transferred between the processes. For this, they have to be frozen and
245 thawed in both parent and child processes.
246
247 By default, only octet strings can be passed between the processes, which
248 is reasonably fast and efficient.
249
250 For more complicated use cases, you can provide your own freeze and thaw
251 functions, by specifying a string with perl source code. It's supposed to
252 return two code references when evaluated: the first receives a list of
253 perl values and must return an octet string. The second receives the octet
254 string and must return the original list of values.
255
256 If you need an external module for serialisation, then you can either
257 pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
258 or C<require> statement into the serialiser string. Or both.
259
260 =back
261
262 See the examples section earlier in this document for some actual
263 examples.
264
265 =cut
266
267 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
268
269 sub run {
270 my ($self, $function, %arg) = @_;
271
272 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
273 my $on_event = delete $arg{on_event};
274 my $on_error = delete $arg{on_error};
275 my $on_destroy = delete $arg{on_destroy};
276
277 # default for on_error is to on_event, if specified
278 $on_error ||= $on_event
279 ? sub { $on_event->(error => shift) }
280 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
281
282 # default for on_event is to raise an error
283 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
284
285 my ($f, $t) = eval $serialiser; die $@ if $@;
286
287 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
288 my ($rlen, $rbuf, $rw) = 512 - 16;
289
290 my $wcb = sub {
291 my $len = syswrite $fh, $wbuf;
292
293 unless (defined $len) {
294 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
295 undef $rw; undef $ww; # it ends here
296 $on_error->("$!");
297 }
298 }
299
300 substr $wbuf, 0, $len, "";
301
302 unless (length $wbuf) {
303 undef $ww;
304 $shutdown and shutdown $fh, 1;
305 }
306 };
307
308 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
309
310 $self->require ($module)
311 ->send_arg ($function, $arg{init}, $serialiser)
312 ->run ("$module\::run", sub {
313 $fh = shift;
314
315 my ($id, $len);
316 $rw = AE::io $fh, 0, sub {
317 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
318 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
319
320 if ($len) {
321 while (8 <= length $rbuf) {
322 ($id, $len) = unpack "LL", $rbuf;
323 8 + $len <= length $rbuf
324 or last;
325
326 my @r = $t->(substr $rbuf, 8, $len);
327 substr $rbuf, 0, 8 + $len, "";
328
329 if ($id) {
330 if (@rcb) {
331 (shift @rcb)->(@r);
332 } elsif (my $cb = delete $rcb{$id}) {
333 $cb->(@r);
334 } else {
335 undef $rw; undef $ww;
336 $on_error->("unexpected data from child");
337 }
338 } else {
339 $on_event->(@r);
340 }
341 }
342 } elsif (defined $len) {
343 undef $rw; undef $ww; # it ends here
344
345 if (@rcb || %rcb) {
346 use Data::Dump;ddx[\@rcb,\%rcb];#d#
347 $on_error->("unexpected eof");
348 } else {
349 $on_destroy->();
350 }
351 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
352 undef $rw; undef $ww; # it ends here
353 $on_error->("read: $!");
354 }
355 };
356
357 $ww ||= AE::io $fh, 1, $wcb;
358 });
359
360 my $guard = Guard::guard {
361 $shutdown = 1;
362 $ww ||= $fh && AE::io $fh, 1, $wcb;
363 };
364
365 my $id;
366
367 $arg{async}
368 ? sub {
369 $id = ($id == 0xffffffff ? 0 : $id) + 1;
370 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
371
372 $rcb{$id} = pop;
373
374 $guard; # keep it alive
375
376 $wbuf .= pack "LL/a*", $id, &$f;
377 $ww ||= $fh && AE::io $fh, 1, $wcb;
378 }
379 : sub {
380 push @rcb, pop;
381
382 $guard; # keep it alive
383
384 $wbuf .= pack "L/a*", &$f;
385 $ww ||= $fh && AE::io $fh, 1, $wcb;
386 }
387 }
388
389 =item $rpc->(..., $cb->(...))
390
391 The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
392 reference. There are two things you can do with it: call it, and let it go
393 out of scope (let it get destroyed).
394
395 If C<async> was false when C<$rpc> was created (the default), then, if you
396 call C<$rpc>, the C<$function> is invoked with all arguments passed to
397 C<$rpc> except the last one (the callback). When the function returns, the
398 callback will be invoked with all the return values.
399
400 If C<async> was true, then the C<$function> receives an additional
401 initial argument, the result callback. In this case, returning from
402 C<$function> does nothing - the function only counts as "done" when the
403 result callback is called, and any arguments passed to it are considered
404 the return values. This makes it possible to "return" from event handlers
405 or e.g. Coro threads.
406
407 The other thing that can be done with the RPC object is to destroy it. In
408 this case, the child process will execute all remaining RPC calls, report
409 their results, and then exit.
410
411 See the examples section earlier in this document for some actual
412 examples.
413
414 =back
415
416 =head1 CHILD PROCESS USAGE
417
418 The following function is not available in this module. They are only
419 available in the namespace of this module when the child is running,
420 without having to load any extra modules. They are part of the child-side
421 API of L<AnyEvent::Fork::RPC>.
422
423 =over 4
424
425 =item AnyEvent::Fork::RPC::event ...
426
427 Send an event to the parent. Events are a bit like RPC calls made by the
428 child process to the parent, except that there is no notion of return
429 values.
430
431 See the examples section earlier in this document for some actual
432 examples.
433
434 =back
435
436 =head1 SEE ALSO
437
438 L<AnyEvent::Fork> (to create the processes in the first place),
439 L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
440
441 =head1 AUTHOR AND CONTACT INFORMATION
442
443 Marc Lehmann <schmorp@schmorp.de>
444 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
445
446 =cut
447
448 1
449