ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.2 by root, Wed Apr 17 17:08:16 2013 UTC vs.
Revision 1.9 by root, Wed Apr 17 21:48:35 2013 UTC

2 2
3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork 3AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC; 7 use AnyEvent::Fork::RPC;
8 # use AnyEvent::Fork is not needed
9 9
10 my $rpc = AnyEvent::Fork 10 my $rpc = AnyEvent::Fork
11 ->new 11 ->new
12 ->require ("MyModule") 12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run ( 13 ->AnyEvent::Fork::RPC::run (
34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38 38
39Loading this module also always loads L<AnyEvent::Fork>, so you can make a
40separate C<use AnyEvent::Fork> if you wish, but you don't have to.
41
42=head1 EXAMPLES
43
44=head2 Synchronous Backend
45
46Here is a simple example that implements a backend that executes C<unlink>
47and C<rmdir> calls, and reports their status back. It also reports the
48number of requests it has processed every three requests, which is clearly
49silly, but illustrates the use of events.
50
51First the parent process:
52
53 use AnyEvent;
54 use AnyEvent::Fork;
55 use AnyEvent::Fork::RPC;
56
57 my $done = AE::cv;
58
59 my $rpc = AnyEvent::Fork
60 ->new
61 ->require ("MyWorker")
62 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
64 on_event => sub { warn "$_[0] requests handled\n" },
65 on_destroy => $done,
66 );
67
68 for my $id (1..6) {
69 $rpc->(rmdir => "/tmp/somepath/$id", sub {
70 $_[0]
71 or warn "/tmp/somepath/$id: $_[1]\n";
72 });
73 }
74
75 undef $rpc;
76
77 $done->recv;
78
79The parent creates the process, queues a few rmdir's. It then forgets
80about the C<$rpc> object, so that the child exits after it has handled the
81requests, and then it waits till the requests have been handled.
82
83The child is implemented using a separate module, C<MyWorker>, shown here:
84
85 package MyWorker;
86
87 my $count;
88
89 sub run {
90 my ($cmd, $path) = @_;
91
92 AnyEvent::Fork::RPC::event ($count)
93 unless ++$count % 3;
94
95 my $status = $cmd eq "rmdir" ? rmdir $path
96 : $cmd eq "unlink" ? unlink $path
97 : die "fatal error, illegal command '$cmd'";
98
99 $status or (0, "$!")
100 }
101
102 1
103
104The C<run> function first sends a "progress" event every three calls, and
105then executes C<rmdir> or C<unlink>, depending on the first parameter (or
106dies with a fatal error - obviously, you must never let this happen :).
107
108Eventually it returns the status value true if the command was successful,
109or the status value 0 and the stringified error message.
110
111On my system, running the first code fragment with the given
112F<MyWorker.pm> in the current directory yields:
113
114 /tmp/somepath/1: No such file or directory
115 /tmp/somepath/2: No such file or directory
116 3 requests handled
117 /tmp/somepath/3: No such file or directory
118 /tmp/somepath/4: No such file or directory
119 /tmp/somepath/5: No such file or directory
120 6 requests handled
121 /tmp/somepath/6: No such file or directory
122
123Obviously, none of the directories I am trying to delete even exist. Also,
124the events and responses are processed in exactly the same order as
125they were created in the child, which is true for both synchronous and
126asynchronous backends.
127
128Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
129not optional. That is because the function isn't defined when the code is
130compiled. You can make sure it is visible by pre-loading the correct
131backend module in the call to C<require>:
132
133 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134
135Since the backend module declares the C<event> function, loading it first
136ensures that perl will correctly interpret calls to it.
137
138And as a final remark, there is a fine module on CPAN that can
139asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
140than this example, namely L<IO::AIO>.
141
39=head1 PARENT PROCESS USAGE 142=head1 PARENT PROCESS USAGE
40 143
41This module exports nothing, and only implements a single function: 144This module exports nothing, and only implements a single function:
42 145
43=over 4 146=over 4
50 153
51use Errno (); 154use Errno ();
52use Guard (); 155use Guard ();
53 156
54use AnyEvent; 157use AnyEvent;
55#use AnyEvent::Fork; 158use AnyEvent::Fork; # we don't actually depend on it, this is for convenience
56 159
57our $VERSION = 0.1; 160our $VERSION = 0.1;
58 161
59=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] 162=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60 163
92Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 195Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93child, with the arguments of that function passed to the callback. 196child, with the arguments of that function passed to the callback.
94 197
95Also called on errors when no C<on_error> handler is provided. 198Also called on errors when no C<on_error> handler is provided.
96 199
200=item on_destroy => $cb->()
201
202Called when the C<$rpc> object has been destroyed and all requests have
203been successfully handled. This is useful when you queue some requests and
204want the child to go away after it has handled them. The problem is that
205the parent must not exit either until all requests have been handled, and
206this can be accomplished by waiting for this callback.
207
97=item init => $function (default none) 208=item init => $function (default none)
98 209
99When specified (by name), this function is called in the child as the very 210When specified (by name), this function is called in the child as the very
100first thing when taking over the process, with all the arguments normally 211first thing when taking over the process, with all the arguments normally
101passed to the C<AnyEvent::Fork::run> function, except the communications 212passed to the C<AnyEvent::Fork::run> function, except the communications
102socket. 213socket.
103 214
104It can be used to do one-time things in the child such as storing passed 215It can be used to do one-time things in the child such as storing passed
105parameters or opening database connections. 216parameters or opening database connections.
106 217
218It is called very early - before the serialisers are created or the
219C<$function> name is resolved into a function reference, so it could be
220used to load any modules that provide the serialiser or function. It can
221not, however, create events.
222
107=item async => $boolean (default: 0) 223=item async => $boolean (default: 0)
108 224
109The default server used in the child does all I/O blockingly, and only 225The default server used in the child does all I/O blockingly, and only
110allows a single RPC call to execute concurrently. 226allows a single RPC call to execute concurrently.
111 227
116the calling semantics of the returned C<$rpc> function. 232the calling semantics of the returned C<$rpc> function.
117 233
118If you want to pre-load the actual back-end modules to enable memory 234If you want to pre-load the actual back-end modules to enable memory
119sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 235sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
120synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 236synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
237
238If you use a template process and want to fork both sync and async
239children, then it is permissible to load both modules.
121 240
122=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 241=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
123 242
124All arguments, result data and event data have to be serialised to be 243All arguments, result data and event data have to be serialised to be
125transferred between the processes. For this, they have to be frozen and 244transferred between the processes. For this, they have to be frozen and
138pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use> 257pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
139or C<require> statement into the serialiser string. Or both. 258or C<require> statement into the serialiser string. Or both.
140 259
141=back 260=back
142 261
262See the examples section earlier in this document for some actual
263examples.
264
143=cut 265=cut
144 266
145our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 267our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
146
147# ideally, we want (SvLEN - SvCUR) || 1024 or somesuch...
148sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 }
149 268
150sub run { 269sub run {
151 my ($self, $function, %arg) = @_; 270 my ($self, $function, %arg) = @_;
152 271
153 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 272 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
154 my $on_event = delete $arg{on_event}; 273 my $on_event = delete $arg{on_event};
155 my $on_error = delete $arg{on_error}; 274 my $on_error = delete $arg{on_error};
275 my $on_destroy = delete $arg{on_destroy};
156 276
157 # default for on_error is to on_event, if specified 277 # default for on_error is to on_event, if specified
158 $on_error ||= $on_event 278 $on_error ||= $on_event
159 ? sub { $on_event->(error => shift) } 279 ? sub { $on_event->(error => shift) }
160 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 280 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
162 # default for on_event is to raise an error 282 # default for on_event is to raise an error
163 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 283 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
164 284
165 my ($f, $t) = eval $serialiser; die $@ if $@; 285 my ($f, $t) = eval $serialiser; die $@ if $@;
166 286
167 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); 287 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
288 my ($rlen, $rbuf, $rw) = 512 - 16;
168 289
169 my $wcb = sub { 290 my $wcb = sub {
170 my $len = syswrite $fh, $wbuf; 291 my $len = syswrite $fh, $wbuf;
171 292
172 if (!defined $len) { 293 unless (defined $len) {
173 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 294 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
174 undef $rw; undef $ww; # it ends here 295 undef $rw; undef $ww; # it ends here
175 $on_error->("$!"); 296 $on_error->("$!");
176 } 297 }
177 } 298 }
188 309
189 $self->require ($module) 310 $self->require ($module)
190 ->send_arg ($function, $arg{init}, $serialiser) 311 ->send_arg ($function, $arg{init}, $serialiser)
191 ->run ("$module\::run", sub { 312 ->run ("$module\::run", sub {
192 $fh = shift; 313 $fh = shift;
314
315 my ($id, $len);
193 $rw = AE::io $fh, 0, sub { 316 $rw = AE::io $fh, 0, sub {
317 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
194 my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf; 318 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
195 319
196 if ($len) { 320 if ($len) {
197 while (5 <= length $rbuf) { 321 while (8 <= length $rbuf) {
198 $len = unpack "L", $rbuf; 322 ($id, $len) = unpack "LL", $rbuf;
199 4 + $len <= length $rbuf 323 8 + $len <= length $rbuf
200 or last; 324 or last;
201 325
202 my @r = $t->(substr $rbuf, 4, $len); 326 my @r = $t->(substr $rbuf, 8, $len);
203 substr $rbuf, 0, $len + 4, ""; 327 substr $rbuf, 0, 8 + $len, "";
328
329 if ($id) {
330 if (@rcb) {
331 (shift @rcb)->(@r);
332 } elsif (my $cb = delete $rcb{$id}) {
333 $cb->(@r);
334 } else {
335 undef $rw; undef $ww;
336 $on_error->("unexpected data from child");
204 337 }
205 if (pop @r) { 338 } else {
206 $on_event->(@r); 339 $on_event->(@r);
207 } elsif (@rcb) {
208 (shift @rcb)->(@r);
209 } else {
210 undef $rw; undef $ww;
211 $on_error->("unexpected data from child");
212 } 340 }
213 } 341 }
214 } elsif (defined $len) { 342 } elsif (defined $len) {
215 undef $rw; undef $ww; # it ends here 343 undef $rw; undef $ww; # it ends here
344
345 if (@rcb || %rcb) {
346 use Data::Dump;ddx[\@rcb,\%rcb];#d#
216 $on_error->("unexpected eof") 347 $on_error->("unexpected eof");
217 if @rcb; 348 } else {
349 $on_destroy->();
350 }
218 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 351 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
219 undef $rw; undef $ww; # it ends here 352 undef $rw; undef $ww; # it ends here
220 $on_error->("read: $!"); 353 $on_error->("read: $!");
221 } 354 }
222 }; 355 };
227 my $guard = Guard::guard { 360 my $guard = Guard::guard {
228 $shutdown = 1; 361 $shutdown = 1;
229 $ww ||= $fh && AE::io $fh, 1, $wcb; 362 $ww ||= $fh && AE::io $fh, 1, $wcb;
230 }; 363 };
231 364
365 my $id;
366
367 $arg{async}
232 sub { 368 ? sub {
233 push @rcb, pop; 369 $id = ($id == 0xffffffff ? 0 : $id) + 1;
370 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
234 371
372 $rcb{$id} = pop;
373
235 $guard; # keep it alive 374 $guard; # keep it alive
236 375
237 $wbuf .= pack "L/a*", &$f; 376 $wbuf .= pack "LL/a*", $id, &$f;
238 $ww ||= $fh && AE::io $fh, 1, $wcb; 377 $ww ||= $fh && AE::io $fh, 1, $wcb;
239 } 378 }
379 : sub {
380 push @rcb, pop;
381
382 $guard; # keep it alive
383
384 $wbuf .= pack "L/a*", &$f;
385 $ww ||= $fh && AE::io $fh, 1, $wcb;
386 }
240} 387}
241 388
389=item $rpc->(..., $cb->(...))
390
391The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
392reference. There are two things you can do with it: call it, and let it go
393out of scope (let it get destroyed).
394
395If C<async> was false when C<$rpc> was created (the default), then, if you
396call C<$rpc>, the C<$function> is invoked with all arguments passed to
397C<$rpc> except the last one (the callback). When the function returns, the
398callback will be invoked with all the return values.
399
400If C<async> was true, then the C<$function> receives an additional
401initial argument, the result callback. In this case, returning from
402C<$function> does nothing - the function only counts as "done" when the
403result callback is called, and any arguments passed to it are considered
404the return values. This makes it possible to "return" from event handlers
405or e.g. Coro threads.
406
407The other thing that can be done with the RPC object is to destroy it. In
408this case, the child process will execute all remaining RPC calls, report
409their results, and then exit.
410
411See the examples section earlier in this document for some actual
412examples.
413
242=back 414=back
243 415
244=head1 CHILD PROCESS USAGE 416=head1 CHILD PROCESS USAGE
245 417
246These functions are not available in this module. They are only available 418The following function is not available in this module. They are only
247in the namespace of this module when the child is running, without 419available in the namespace of this module when the child is running,
248having to load any extra module. They are part of the child-side API of 420without having to load any extra modules. They are part of the child-side
249L<AnyEvent::Fork::RPC>. 421API of L<AnyEvent::Fork::RPC>.
250 422
251=over 4 423=over 4
252
253=item AnyEvent::Fork::RPC::quit
254
255This function can be called to gracefully stop the child process when it
256is idle.
257
258After this function is called, the process stops handling incoming RPC
259requests, but outstanding events and function return values will be sent
260to the parent. When all data has been sent, the process calls C<exit>.
261
262Since the parent might not expect the child to exit at random points in
263time, it is often better to signal the parent by sending an C<event> and
264letting the parent close down the child process.
265 424
266=item AnyEvent::Fork::RPC::event ... 425=item AnyEvent::Fork::RPC::event ...
267 426
268Send an event to the parent. Events are a bit like RPC calls made by the 427Send an event to the parent. Events are a bit like RPC calls made by the
269child process to the parent, except that there is no notion of return 428child process to the parent, except that there is no notion of return
270values. 429values.
271 430
431See the examples section earlier in this document for some actual
432examples.
433
272=back 434=back
273 435
274=head1 SEE ALSO 436=head1 SEE ALSO
275 437
276L<AnyEvent::Fork> (to create the processes in the first place), 438L<AnyEvent::Fork> (to create the processes in the first place),

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines