ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.6
Committed: Wed Apr 17 19:43:48 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.5: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC;
9
10 my $rpc = AnyEvent::Fork
11 ->new
12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server",
15 );
16
17 my $cv = AE::cv;
18
19 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n";
21 $cv->send;
22 });
23
24 $cv->recv;
25
26 =head1 DESCRIPTION
27
28 This module implements a simple RPC protocol and backend for processes
29 created via L<AnyEvent::Fork>, allowing you to call a function in the
30 child process and receive its return values (up to 4GB serialised).
31
32 It implements two different backends: a synchronous one that works like a
33 normal function call, and an asynchronous one that can run multiple jobs
34 concurrently in the child, using AnyEvent.
35
36 It also implements an asynchronous event mechanism from the child to the
37 parent, that could be used for progress indications or other information.
38
39 =head1 EXAMPLES
40
41 =head2 Synchronous Backend
42
43 Here is a simple example that implements a backend that executes C<unlink>
44 and C<rmdir> calls, and reports their status back. It also reports the
45 number of requests it has processed every three requests, which is clearly
46 silly, but illustrates the use of events.
47
48 First the parent process:
49
50 use AnyEvent;
51 use AnyEvent::Fork;
52 use AnyEvent::Fork::RPC;
53
54 my $done = AE::cv;
55
56 my $rpc = AnyEvent::Fork
57 ->new
58 ->require ("MyWorker")
59 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
60 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
61 on_event => sub { warn "$_[0] requests handled\n" },
62 on_destroy => $done,
63 );
64
65 for my $id (1..6) {
66 $rpc->(rmdir => "/tmp/somepath/$id", sub {
67 $_[0]
68 or warn "/tmp/somepath/$id: $_[1]\n";
69 });
70 }
71
72 undef $rpc;
73
74 $done->recv;
75
76 The parent creates the process, queues a few rmdir's. It then forgets
77 about the C<$rpc> object, so that the child exits after it has handled the
78 requests, and then it waits till the requests have been handled.
79
80 The child is implemented using a separate module, C<MyWorker>, shown here:
81
82 package MyWorker;
83
84 my $count;
85
86 sub run {
87 my ($cmd, $path) = @_;
88
89 AnyEvent::Fork::RPC::event ($count)
90 unless ++$count % 3;
91
92 my $status = $cmd eq "rmdir" ? rmdir $path
93 : $cmd eq "unlink" ? unlink $path
94 : die "fatal error, illegal command '$cmd'";
95
96 $status or (0, "$!")
97 }
98
99 1
100
101 The C<run> function first sends a "progress" event every three calls, and
102 then executes C<rmdir> or C<unlink>, depending on the first parameter (or
103 dies with a fatal error - obviously, you must never let this happen :).
104
105 Eventually it returns the status value true if the command was successful,
106 or the status value 0 and the stringified error message.
107
108 On my system, running the first code fragment with the given
109 F<MyWorker.pm> in the current directory yields:
110
111 /tmp/somepath/1: No such file or directory
112 /tmp/somepath/2: No such file or directory
113 3 requests handled
114 /tmp/somepath/3: No such file or directory
115 /tmp/somepath/4: No such file or directory
116 /tmp/somepath/5: No such file or directory
117 6 requests handled
118 /tmp/somepath/6: No such file or directory
119
120 Obviously, none of the directories I am trying to delete even exist. Also,
121 the events and responses are processed in exactly the same order as
122 they were created in the child, which is true for both synchronous and
123 asynchronous backends.
124
125 Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
126 not optional. That is because the function isn't defined when the code is
127 compiled. You can make sure it is visible by pre-loading the correct
128 backend module in the call to C<require>:
129
130 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
131
132 Since the backend module declares the C<event> function, loading it first
133 ensures that perl will correctly interpret calls to it.
134
135 And as a final remark, there is a fine module on CPAN that can
136 asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
137 than this example, namely L<IO::AIO>.
138
139 =head1 PARENT PROCESS USAGE
140
141 This module exports nothing, and only implements a single function:
142
143 =over 4
144
145 =cut
146
147 package AnyEvent::Fork::RPC;
148
149 use common::sense;
150
151 use Errno ();
152 use Guard ();
153
154 use AnyEvent;
155 #use AnyEvent::Fork;
156
157 our $VERSION = 0.1;
158
159 =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
160
161 The traditional way to call it. But it is way cooler to call it in the
162 following way:
163
164 =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
165
166 This C<run> function/method can be used in place of the
167 L<AnyEvent::Fork::run> method. Just like that method, it takes over
168 the L<AnyEvent::Fork> process, but instead of calling the specified
169 C<$function> directly, it runs a server that accepts RPC calls and handles
170 responses.
171
172 It returns a function reference that can be used to call the function in
173 the child process, handling serialisation and data transfers.
174
175 The following key/value pairs are allowed. It is recommended to have at
176 least an C<on_error> or C<on_event> handler set.
177
178 =over 4
179
180 =item on_error => $cb->($msg)
181
182 Called on (fatal) errors, with a descriptive (hopefully) message. If
183 this callback is not provided, but C<on_event> is, then the C<on_event>
184 callback is called with the first argument being the string C<error>,
185 followed by the error message.
186
187 If neither handler is provided it prints the error to STDERR and will
188 start failing badly.
189
190 =item on_event => $cb->(...)
191
192 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
193 child, with the arguments of that function passed to the callback.
194
195 Also called on errors when no C<on_error> handler is provided.
196
197 =item on_destroy => $cb->()
198
199 Called when the C<$rpc> object has been destroyed and all requests have
200 been successfully handled. This is useful when you queue some requests and
201 want the child to go away after it has handled them. The problem is that
202 the parent must not exit either until all requests have been handled, and
203 this can be accomplished by waiting for this callback.
204
205 =item init => $function (default none)
206
207 When specified (by name), this function is called in the child as the very
208 first thing when taking over the process, with all the arguments normally
209 passed to the C<AnyEvent::Fork::run> function, except the communications
210 socket.
211
212 It can be used to do one-time things in the child such as storing passed
213 parameters or opening database connections.
214
215 It is called very early - before the serialisers are created or the
216 C<$function> name is resolved into a function reference, so it could be
217 used to load any modules that provide the serialiser or function. It can
218 not, however, create events.
219
220 =item async => $boolean (default: 0)
221
222 The default server used in the child does all I/O blockingly, and only
223 allows a single RPC call to execute concurrently.
224
225 Setting C<async> to a true value switches to another implementation that
226 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
227
228 The actual API in the child is documented in the section that describes
229 the calling semantics of the returned C<$rpc> function.
230
231 If you want to pre-load the actual back-end modules to enable memory
232 sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
233 synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
234
235 If you use a template process and want to fork both sync and async
236 children, then it is permissible to load both modules.
237
238 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
239
240 All arguments, result data and event data have to be serialised to be
241 transferred between the processes. For this, they have to be frozen and
242 thawed in both parent and child processes.
243
244 By default, only octet strings can be passed between the processes, which
245 is reasonably fast and efficient.
246
247 For more complicated use cases, you can provide your own freeze and thaw
248 functions, by specifying a string with perl source code. It's supposed to
249 return two code references when evaluated: the first receives a list of
250 perl values and must return an octet string. The second receives the octet
251 string and must return the original list of values.
252
253 If you need an external module for serialisation, then you can either
254 pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
255 or C<require> statement into the serialiser string. Or both.
256
257 =back
258
259 =cut
260
261 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
262
263 sub run {
264 my ($self, $function, %arg) = @_;
265
266 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
267 my $on_event = delete $arg{on_event};
268 my $on_error = delete $arg{on_error};
269 my $on_destroy = delete $arg{on_destroy};
270
271 # default for on_error is to on_event, if specified
272 $on_error ||= $on_event
273 ? sub { $on_event->(error => shift) }
274 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
275
276 # default for on_event is to raise an error
277 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
278
279 my ($f, $t) = eval $serialiser; die $@ if $@;
280
281 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw);
282 my ($rlen, $rbuf) = 512 - 16;
283
284 my $wcb = sub {
285 my $len = syswrite $fh, $wbuf;
286
287 if (!defined $len) {
288 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
289 undef $rw; undef $ww; # it ends here
290 $on_error->("$!");
291 }
292 }
293
294 substr $wbuf, 0, $len, "";
295
296 unless (length $wbuf) {
297 undef $ww;
298 $shutdown and shutdown $fh, 1;
299 }
300 };
301
302 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
303
304 $self->require ($module)
305 ->send_arg ($function, $arg{init}, $serialiser)
306 ->run ("$module\::run", sub {
307 $fh = shift;
308 $rw = AE::io $fh, 0, sub {
309 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
310 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
311
312 if ($len) {
313 while (5 <= length $rbuf) {
314 $len = unpack "L", $rbuf;
315 4 + $len <= length $rbuf
316 or last;
317
318 my @r = $t->(substr $rbuf, 4, $len);
319 substr $rbuf, 0, $len + 4, "";
320
321 if (pop @r) {
322 $on_event->(@r);
323 } elsif (@rcb) {
324 (shift @rcb)->(@r);
325 } else {
326 undef $rw; undef $ww;
327 $on_error->("unexpected data from child");
328 }
329 }
330 } elsif (defined $len) {
331 undef $rw; undef $ww; # it ends here
332
333 if (@rcb) {
334 $on_error->("unexpected eof");
335 } else {
336 $on_destroy->();
337 }
338 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
339 undef $rw; undef $ww; # it ends here
340 $on_error->("read: $!");
341 }
342 };
343
344 $ww ||= AE::io $fh, 1, $wcb;
345 });
346
347 my $guard = Guard::guard {
348 $shutdown = 1;
349 $ww ||= $fh && AE::io $fh, 1, $wcb;
350 };
351
352 sub {
353 push @rcb, pop;
354
355 $guard; # keep it alive
356
357 $wbuf .= pack "L/a*", &$f;
358 $ww ||= $fh && AE::io $fh, 1, $wcb;
359 }
360 }
361
362 =item $rpc->(..., $cb->(...))
363
364 The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
365 reference. There are two things you can do with it: call it, and let it go
366 out of scope (let it get destroyed).
367
368 If C<async> was false when C<$rpc> was created (the default), then, if you
369 call C<$rpc>, the C<$function> is invoked with all arguments passed to
370 C<$rpc> except the last one (the callback). When the function returns, the
371 callback will be invoked with all the return values.
372
373 If C<async> was true, then the C<$function> receives an additional
374 initial argument, the result callback. In this case, returning from
375 C<$function> does nothing - the function only counts as "done" when the
376 result callback is called, and any arguments passed to it are considered
377 the return values. This makes it possible to "return" from event handlers
378 or e.g. Coro threads.
379
380 The other thing that can be done with the RPC object is to destroy it. In
381 this case, the child process will execute all remaining RPC calls, report
382 their results, and then exit.
383
384 =back
385
386 =head1 CHILD PROCESS USAGE
387
388 The following function is not available in this module. They are only
389 available in the namespace of this module when the child is running,
390 without having to load any extra modules. They are part of the child-side
391 API of L<AnyEvent::Fork::RPC>.
392
393 =over 4
394
395 =item AnyEvent::Fork::RPC::event ...
396
397 Send an event to the parent. Events are a bit like RPC calls made by the
398 child process to the parent, except that there is no notion of return
399 values.
400
401 =back
402
403 =head1 SEE ALSO
404
405 L<AnyEvent::Fork> (to create the processes in the first place),
406 L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
407
408 =head1 AUTHOR AND CONTACT INFORMATION
409
410 Marc Lehmann <schmorp@schmorp.de>
411 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
412
413 =cut
414
415 1
416