ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.4
Committed: Wed Apr 17 19:38:25 2013 UTC (11 years, 2 months ago) by root
Branch: MAIN
Changes since 1.3: +152 -11 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC;
9
10 my $rpc = AnyEvent::Fork
11 ->new
12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server",
15 );
16
17 my $cv = AE::cv;
18
19 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n";
21 $cv->send;
22 });
23
24 $cv->recv;
25
26 =head1 DESCRIPTION
27
28 This module implements a simple RPC protocol and backend for processes
29 created via L<AnyEvent::Fork>, allowing you to call a function in the
30 child process and receive its return values (up to 4GB serialised).
31
32 It implements two different backends: a synchronous one that works like a
33 normal function call, and an asynchronous one that can run multiple jobs
34 concurrently in the child, using AnyEvent.
35
36 It also implements an asynchronous event mechanism from the child to the
37 parent, that could be used for progress indications or other information.
38
39 =head1 EXAMPLES
40
41 =head2 Synchronous Backend
42
43 Here is a simple example that implements a backend that executes C<unlink>
44 and C<rmdir> calls, and reports their status back. It also reports the
45 number of requests it has processed every three requests, which is clearly
46 silly, but illustrates the use of events.
47
48 First the parent process:
49
50 use AnyEvent;
51 use AnyEvent::Fork;
52 use AnyEvent::Fork::RPC;
53
54 my $done = AE::cv;
55
56 my $rpc = AnyEvent::Fork
57 ->new
58 ->require ("MyWorker")
59 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
60 on_event => sub { warn "$_[0] requests handled\n" },
61 on_destroy => $done,
62 );
63
64 for my $id (1..6) {
65 $rpc->(rmdir => "/tmp/somepath/$id", sub {
66 $_[0]
67 or warn "/tmp/somepath/$id: $_[1]\n";
68 });
69 }
70
71 undef $rpc;
72
73 $done->recv;
74
75 The parent creates the process, queues a few rmdir's. It then forgets
76 about the C<$rpc> object, so that the child exits after it has handled the
77 requests, and then it waits till the requests have been handled.
78
79 The child is implemented using a separate module, C<MyWorker>, shown here:
80
81 package MyWorker;
82
83 my $count;
84
85 sub run {
86 my ($cmd, $path) = @_;
87
88 AnyEvent::Fork::RPC::event ($count)
89 unless ++$count % 3;
90
91 my $status = $cmd eq "rmdir" ? rmdir $path
92 : $cmd eq "unlink" ? unlink $path
93 : die "fatal error, illegal command '$cmd'";
94
95 $status or (0, "$!")
96 }
97
98 1
99
100 The C<run> function first sends a "progress" event every three calls, and
101 then executes C<rmdir> or C<unlink>, depending on the first parameter (or
102 dies with a fatal error - obviously, you must never let this happen :).
103
104 Eventually it returns the status value true if the command was successful,
105 or the status value 0 and the stringified error message.
106
107 On my system, running the first cdoe fragment with the given
108 F<MyWorker.pm> in the current directory yields:
109
110 /tmp/somepath/1: No such file or directory
111 /tmp/somepath/2: No such file or directory
112 3 requests handled
113 /tmp/somepath/3: No such file or directory
114 /tmp/somepath/4: No such file or directory
115 /tmp/somepath/5: No such file or directory
116 6 requests handled
117 /tmp/somepath/6: No such file or directory
118
119 Obviously, none of the directories I am trying to delete even exist. Also,
120 the events and responses are processed in exactly the same order as
121 they were created in the child, which is true for both synchronous and
122 asynchronous backends.
123
124 Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
125 not optional. That is because the function isn't defined when the code is
126 compiled. You can make sure it is visible by pre-loading the correct
127 backend module in the call to C<require>:
128
129 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
130
131 Since the backend module declares the C<event> function, loading it first
132 ensures that perl will correctly interpret calls to it.
133
134 And as a final remark, there is a fine module on CPAN that can
135 asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
136 than this example, namely L<IO::AIO>.
137
138 =head1 PARENT PROCESS USAGE
139
140 This module exports nothing, and only implements a single function:
141
142 =over 4
143
144 =cut
145
146 package AnyEvent::Fork::RPC;
147
148 use common::sense;
149
150 use Errno ();
151 use Guard ();
152
153 use AnyEvent;
154 #use AnyEvent::Fork;
155
156 our $VERSION = 0.1;
157
158 =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
159
160 The traditional way to call it. But it is way cooler to call it in the
161 following way:
162
163 =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
164
165 This C<run> function/method can be used in place of the
166 L<AnyEvent::Fork::run> method. Just like that method, it takes over
167 the L<AnyEvent::Fork> process, but instead of calling the specified
168 C<$function> directly, it runs a server that accepts RPC calls and handles
169 responses.
170
171 It returns a function reference that can be used to call the function in
172 the child process, handling serialisation and data transfers.
173
174 The following key/value pairs are allowed. It is recommended to have at
175 least an C<on_error> or C<on_event> handler set.
176
177 =over 4
178
179 =item on_error => $cb->($msg)
180
181 Called on (fatal) errors, with a descriptive (hopefully) message. If
182 this callback is not provided, but C<on_event> is, then the C<on_event>
183 callback is called with the first argument being the string C<error>,
184 followed by the error message.
185
186 If neither handler is provided it prints the error to STDERR and will
187 start failing badly.
188
189 =item on_event => $cb->(...)
190
191 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
192 child, with the arguments of that function passed to the callback.
193
194 Also called on errors when no C<on_error> handler is provided.
195
196 =item on_destroy => $cb->()
197
198 Called when the C<$rpc> object has been destroyed and all requests have
199 been successfully handled. This is useful when you queue some requests and
200 want the child to go away after it has handled them. The problem is that
201 the parent must not exit either until all requests have been handled, and
202 this cna be accomplished by waiting for this callback.
203
204 =item init => $function (default none)
205
206 When specified (by name), this function is called in the child as the very
207 first thing when taking over the process, with all the arguments normally
208 passed to the C<AnyEvent::Fork::run> function, except the communications
209 socket.
210
211 It can be used to do one-time things in the child such as storing passed
212 parameters or opening database connections.
213
214 It is called very early - before the serialisers are created or the
215 C<$function> name is resolved into a function reference, so it could be
216 used to load any modules that provide the serialiser or function. It can
217 not, however, create events.
218
219 =item async => $boolean (default: 0)
220
221 The default server used in the child does all I/O blockingly, and only
222 allows a single RPC call to execute concurrently.
223
224 Setting C<async> to a true value switches to another implementation that
225 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
226
227 The actual API in the child is documented in the section that describes
228 the calling semantics of the returned C<$rpc> function.
229
230 If you want to pre-load the actual back-end modules to enable memory
231 sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
232 synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
233
234 If you use a template process and want to fork both sync and async
235 children, then it is permissible to laod both modules.
236
237 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
238
239 All arguments, result data and event data have to be serialised to be
240 transferred between the processes. For this, they have to be frozen and
241 thawed in both parent and child processes.
242
243 By default, only octet strings can be passed between the processes, which
244 is reasonably fast and efficient.
245
246 For more complicated use cases, you can provide your own freeze and thaw
247 functions, by specifying a string with perl source code. It's supposed to
248 return two code references when evaluated: the first receives a list of
249 perl values and must return an octet string. The second receives the octet
250 string and must return the original list of values.
251
252 If you need an external module for serialisation, then you can either
253 pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
254 or C<require> statement into the serialiser string. Or both.
255
256 =back
257
258 =cut
259
260 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
261
262 sub run {
263 my ($self, $function, %arg) = @_;
264
265 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
266 my $on_event = delete $arg{on_event};
267 my $on_error = delete $arg{on_error};
268 my $on_destroy = delete $arg{on_destroy};
269
270 # default for on_error is to on_event, if specified
271 $on_error ||= $on_event
272 ? sub { $on_event->(error => shift) }
273 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
274
275 # default for on_event is to raise an error
276 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
277
278 my ($f, $t) = eval $serialiser; die $@ if $@;
279
280 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw);
281 my ($rlen, $rbuf) = 512 - 16;
282
283 my $wcb = sub {
284 my $len = syswrite $fh, $wbuf;
285
286 if (!defined $len) {
287 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
288 undef $rw; undef $ww; # it ends here
289 $on_error->("$!");
290 }
291 }
292
293 substr $wbuf, 0, $len, "";
294
295 unless (length $wbuf) {
296 undef $ww;
297 $shutdown and shutdown $fh, 1;
298 }
299 };
300
301 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
302
303 $self->require ($module)
304 ->send_arg ($function, $arg{init}, $serialiser)
305 ->run ("$module\::run", sub {
306 $fh = shift;
307 $rw = AE::io $fh, 0, sub {
308 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
309 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
310
311 if ($len) {
312 while (5 <= length $rbuf) {
313 $len = unpack "L", $rbuf;
314 4 + $len <= length $rbuf
315 or last;
316
317 my @r = $t->(substr $rbuf, 4, $len);
318 substr $rbuf, 0, $len + 4, "";
319
320 if (pop @r) {
321 $on_event->(@r);
322 } elsif (@rcb) {
323 (shift @rcb)->(@r);
324 } else {
325 undef $rw; undef $ww;
326 $on_error->("unexpected data from child");
327 }
328 }
329 } elsif (defined $len) {
330 undef $rw; undef $ww; # it ends here
331
332 if (@rcb) {
333 $on_error->("unexpected eof");
334 } else {
335 $on_destroy->();
336 }
337 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
338 undef $rw; undef $ww; # it ends here
339 $on_error->("read: $!");
340 }
341 };
342
343 $ww ||= AE::io $fh, 1, $wcb;
344 });
345
346 my $guard = Guard::guard {
347 $shutdown = 1;
348 $ww ||= $fh && AE::io $fh, 1, $wcb;
349 };
350
351 sub {
352 push @rcb, pop;
353
354 $guard; # keep it alive
355
356 $wbuf .= pack "L/a*", &$f;
357 $ww ||= $fh && AE::io $fh, 1, $wcb;
358 }
359 }
360
361 =item $rpc->(..., $cb->(...))
362
363 The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
364 reference. There are two things you can do with it: call it, and let it go
365 out of scope (let it get destroyed).
366
367 If C<async> was false when C<$rpc> was created (the default), then, if you
368 call C<$rpc>, the C<$function> is invoked with all arguments passed to
369 C<$rpc> except the last one (the callback). When the function returns, the
370 callback will be invoked with all the return values.
371
372 If C<async> was true, then the C<$function> receives an additional
373 initial argument, the result callback. In this case, returning from
374 C<$function> does nothing - the function only counts as "done" when the
375 result callback is called, and any arguments passed to it are considered
376 the return values. This makes it possible to "return" from event handlers
377 or e.g. Coro threads.
378
379 The other thing that can be done with the RPC object is to destroy it. In
380 this case, the child process will execute all remaining RPC calls, report
381 their results, and then exit.
382
383 =back
384
385 =head1 CHILD PROCESS USAGE
386
387 The following function is not available in this module. They are only
388 available in the namespace of this module when the child is running,
389 without having to load any extra modules. They are part of the child-side
390 API of L<AnyEvent::Fork::RPC>.
391
392 =over 4
393
394 =item AnyEvent::Fork::RPC::event ...
395
396 Send an event to the parent. Events are a bit like RPC calls made by the
397 child process to the parent, except that there is no notion of return
398 values.
399
400 =back
401
402 =head1 SEE ALSO
403
404 L<AnyEvent::Fork> (to create the processes in the first place),
405 L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
406
407 =head1 AUTHOR AND CONTACT INFORMATION
408
409 Marc Lehmann <schmorp@schmorp.de>
410 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
411
412 =cut
413
414 1
415