ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.1 by root, Wed Apr 17 15:55:59 2013 UTC vs.
Revision 1.5 by root, Wed Apr 17 19:39:54 2013 UTC

34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38 38
39=head1 EXAMPLES
40
41=head2 Synchronous Backend
42
43Here is a simple example that implements a backend that executes C<unlink>
44and C<rmdir> calls, and reports their status back. It also reports the
45number of requests it has processed every three requests, which is clearly
46silly, but illustrates the use of events.
47
48First the parent process:
49
50 use AnyEvent;
51 use AnyEvent::Fork;
52 use AnyEvent::Fork::RPC;
53
54 my $done = AE::cv;
55
56 my $rpc = AnyEvent::Fork
57 ->new
58 ->require ("MyWorker")
59 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
60 on_error => sub { warn "FATAL: $_[0]"; exit 1 },
61 on_event => sub { warn "$_[0] requests handled\n" },
62 on_destroy => $done,
63 );
64
65 for my $id (1..6) {
66 $rpc->(rmdir => "/tmp/somepath/$id", sub {
67 $_[0]
68 or warn "/tmp/somepath/$id: $_[1]\n";
69 });
70 }
71
72 undef $rpc;
73
74 $done->recv;
75
76The parent creates the process, queues a few rmdir's. It then forgets
77about the C<$rpc> object, so that the child exits after it has handled the
78requests, and then it waits till the requests have been handled.
79
80The child is implemented using a separate module, C<MyWorker>, shown here:
81
82 package MyWorker;
83
84 my $count;
85
86 sub run {
87 my ($cmd, $path) = @_;
88
89 AnyEvent::Fork::RPC::event ($count)
90 unless ++$count % 3;
91
92 my $status = $cmd eq "rmdir" ? rmdir $path
93 : $cmd eq "unlink" ? unlink $path
94 : die "fatal error, illegal command '$cmd'";
95
96 $status or (0, "$!")
97 }
98
99 1
100
101The C<run> function first sends a "progress" event every three calls, and
102then executes C<rmdir> or C<unlink>, depending on the first parameter (or
103dies with a fatal error - obviously, you must never let this happen :).
104
105Eventually it returns the status value true if the command was successful,
106or the status value 0 and the stringified error message.
107
108On my system, running the first cdoe fragment with the given
109F<MyWorker.pm> in the current directory yields:
110
111 /tmp/somepath/1: No such file or directory
112 /tmp/somepath/2: No such file or directory
113 3 requests handled
114 /tmp/somepath/3: No such file or directory
115 /tmp/somepath/4: No such file or directory
116 /tmp/somepath/5: No such file or directory
117 6 requests handled
118 /tmp/somepath/6: No such file or directory
119
120Obviously, none of the directories I am trying to delete even exist. Also,
121the events and responses are processed in exactly the same order as
122they were created in the child, which is true for both synchronous and
123asynchronous backends.
124
125Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
126not optional. That is because the function isn't defined when the code is
127compiled. You can make sure it is visible by pre-loading the correct
128backend module in the call to C<require>:
129
130 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
131
132Since the backend module declares the C<event> function, loading it first
133ensures that perl will correctly interpret calls to it.
134
135And as a final remark, there is a fine module on CPAN that can
136asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
137than this example, namely L<IO::AIO>.
138
39=head1 PARENT PROCESS USAGE 139=head1 PARENT PROCESS USAGE
40 140
41This module exports nothing, and only implements a single function: 141This module exports nothing, and only implements a single function:
42 142
43=over 4 143=over 4
92Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 192Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93child, with the arguments of that function passed to the callback. 193child, with the arguments of that function passed to the callback.
94 194
95Also called on errors when no C<on_error> handler is provided. 195Also called on errors when no C<on_error> handler is provided.
96 196
197=item on_destroy => $cb->()
198
199Called when the C<$rpc> object has been destroyed and all requests have
200been successfully handled. This is useful when you queue some requests and
201want the child to go away after it has handled them. The problem is that
202the parent must not exit either until all requests have been handled, and
203this cna be accomplished by waiting for this callback.
204
97=item init => $function (default none) 205=item init => $function (default none)
98 206
99When specified (by name), this function is called in the child as the very 207When specified (by name), this function is called in the child as the very
100first thing when taking over the process, with all the arguments normally 208first thing when taking over the process, with all the arguments normally
101passed to the C<AnyEvent::Fork::run> function, except the communications 209passed to the C<AnyEvent::Fork::run> function, except the communications
102socket. 210socket.
103 211
104It can be used to do one-time things in the child such as storing passed 212It can be used to do one-time things in the child such as storing passed
105parameters or opening database connections. 213parameters or opening database connections.
106 214
215It is called very early - before the serialisers are created or the
216C<$function> name is resolved into a function reference, so it could be
217used to load any modules that provide the serialiser or function. It can
218not, however, create events.
219
107=item async => $boolean (default: 0) 220=item async => $boolean (default: 0)
108 221
109The default server used in the child does all I/O blockingly, and only 222The default server used in the child does all I/O blockingly, and only
110allows a single RPC call to execute concurrently. 223allows a single RPC call to execute concurrently.
111 224
112Setting C<async> to a true value switches to another implementation that 225Setting C<async> to a true value switches to another implementation that
113uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. 226uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
114 227
115The actual API in the child is documented in the section that describes 228The actual API in the child is documented in the section that describes
116the calling semantics of the returned C<$rpc> function. 229the calling semantics of the returned C<$rpc> function.
230
231If you want to pre-load the actual back-end modules to enable memory
232sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
233synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
234
235If you use a template process and want to fork both sync and async
236children, then it is permissible to laod both modules.
117 237
118=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 238=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
119 239
120All arguments, result data and event data have to be serialised to be 240All arguments, result data and event data have to be serialised to be
121transferred between the processes. For this, they have to be frozen and 241transferred between the processes. For this, they have to be frozen and
128functions, by specifying a string with perl source code. It's supposed to 248functions, by specifying a string with perl source code. It's supposed to
129return two code references when evaluated: the first receives a list of 249return two code references when evaluated: the first receives a list of
130perl values and must return an octet string. The second receives the octet 250perl values and must return an octet string. The second receives the octet
131string and must return the original list of values. 251string and must return the original list of values.
132 252
253If you need an external module for serialisation, then you can either
254pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
255or C<require> statement into the serialiser string. Or both.
256
133=back 257=back
134 258
135=cut 259=cut
136 260
137our $SERIALISE_STRINGS = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 261our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
138 262
139sub run { 263sub run {
140 my ($self, $function, %arg) = @_; 264 my ($self, $function, %arg) = @_;
141 265
142 my $serialiser = delete $arg{serialiser} || $SERIALISE_STRINGS; 266 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
143 my $on_event = delete $arg{on_event}; 267 my $on_event = delete $arg{on_event};
144 my $on_error = delete $arg{on_error}; 268 my $on_error = delete $arg{on_error};
269 my $on_destroy = delete $arg{on_destroy};
145 270
146 # default for on_error is to on_event, if specified 271 # default for on_error is to on_event, if specified
147 $on_error ||= $on_event 272 $on_error ||= $on_event
148 ? sub { $on_event->(error => shift) } 273 ? sub { $on_event->(error => shift) }
149 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 274 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
151 # default for on_event is to raise an error 276 # default for on_event is to raise an error
152 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 277 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
153 278
154 my ($f, $t) = eval $serialiser; die $@ if $@; 279 my ($f, $t) = eval $serialiser; die $@ if $@;
155 280
156 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); 281 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw);
282 my ($rlen, $rbuf) = 512 - 16;
157 283
158 my $wcb = sub { 284 my $wcb = sub {
159 my $len = syswrite $fh, $wbuf; 285 my $len = syswrite $fh, $wbuf;
160 286
161 if (!defined $len) { 287 if (!defined $len) {
178 $self->require ($module) 304 $self->require ($module)
179 ->send_arg ($function, $arg{init}, $serialiser) 305 ->send_arg ($function, $arg{init}, $serialiser)
180 ->run ("$module\::run", sub { 306 ->run ("$module\::run", sub {
181 $fh = shift; 307 $fh = shift;
182 $rw = AE::io $fh, 0, sub { 308 $rw = AE::io $fh, 0, sub {
309 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
183 my $len = sysread $fh, $rbuf, 512 + length $rbuf, length $rbuf; 310 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
184 311
185 if ($len) { 312 if ($len) {
186 while (5 <= length $rbuf) { 313 while (5 <= length $rbuf) {
187 $len = unpack "L", $rbuf; 314 $len = unpack "L", $rbuf;
188 if (4 + $len <= length $rbuf) { 315 4 + $len <= length $rbuf
316 or last;
317
189 my @r = $t->(substr $rbuf, 4, $len); 318 my @r = $t->(substr $rbuf, 4, $len);
190 substr $rbuf, 0, $len + 4, ""; 319 substr $rbuf, 0, $len + 4, "";
191 320
192 if (pop @r) { 321 if (pop @r) {
193 $on_event->(@r); 322 $on_event->(@r);
194 } elsif (@rcb) { 323 } elsif (@rcb) {
195 (shift @rcb)->(@r); 324 (shift @rcb)->(@r);
196 } else { 325 } else {
197 undef $rw; undef $ww; 326 undef $rw; undef $ww;
198 $on_error->("unexpected data from child"); 327 $on_error->("unexpected data from child");
199 }
200 } 328 }
201 } 329 }
202 } elsif (defined $len) { 330 } elsif (defined $len) {
203 undef $rw; undef $ww; # it ends here 331 undef $rw; undef $ww; # it ends here
332
333 if (@rcb) {
204 $on_error->("unexpected eof") 334 $on_error->("unexpected eof");
205 if @rcb; 335 } else {
336 $on_destroy->();
337 }
206 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 338 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
207 undef $rw; undef $ww; # it ends here 339 undef $rw; undef $ww; # it ends here
208 $on_error->("read: $!"); 340 $on_error->("read: $!");
209 } 341 }
210 }; 342 };
225 $wbuf .= pack "L/a*", &$f; 357 $wbuf .= pack "L/a*", &$f;
226 $ww ||= $fh && AE::io $fh, 1, $wcb; 358 $ww ||= $fh && AE::io $fh, 1, $wcb;
227 } 359 }
228} 360}
229 361
362=item $rpc->(..., $cb->(...))
363
364The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
365reference. There are two things you can do with it: call it, and let it go
366out of scope (let it get destroyed).
367
368If C<async> was false when C<$rpc> was created (the default), then, if you
369call C<$rpc>, the C<$function> is invoked with all arguments passed to
370C<$rpc> except the last one (the callback). When the function returns, the
371callback will be invoked with all the return values.
372
373If C<async> was true, then the C<$function> receives an additional
374initial argument, the result callback. In this case, returning from
375C<$function> does nothing - the function only counts as "done" when the
376result callback is called, and any arguments passed to it are considered
377the return values. This makes it possible to "return" from event handlers
378or e.g. Coro threads.
379
380The other thing that can be done with the RPC object is to destroy it. In
381this case, the child process will execute all remaining RPC calls, report
382their results, and then exit.
383
230=back 384=back
231 385
232=head1 CHILD PROCESS USAGE 386=head1 CHILD PROCESS USAGE
233 387
234These functions are not available in this module. They are only available 388The following function is not available in this module. They are only
235in the namespace of this module when the child is running, without 389available in the namespace of this module when the child is running,
236having to load any extra module. They are part of the child-side API of 390without having to load any extra modules. They are part of the child-side
237L<AnyEvent::Fork::RPC>. 391API of L<AnyEvent::Fork::RPC>.
238 392
239=over 4 393=over 4
240
241=item AnyEvent::Fork::RPC::quit
242
243This function can be called to gracefully stop the child process when it
244is idle.
245
246After this function is called, the process stops handling incoming RPC
247requests, but outstanding events and function return values will be sent
248to the parent. When all data has been sent, the process calls C<exit>.
249
250Since the parent might not expect the child to exit at random points in
251time, it is often better to signal the parent by sending an C<event> and
252letting the parent close down the child process.
253 394
254=item AnyEvent::Fork::RPC::event ... 395=item AnyEvent::Fork::RPC::event ...
255 396
256Send an event to the parent. Events are a bit like RPC calls made by the 397Send an event to the parent. Events are a bit like RPC calls made by the
257child process to the parent, except that there is no notion of return 398child process to the parent, except that there is no notion of return

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines