ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
(Generate patch)

Comparing AnyEvent-Fork-RPC/RPC.pm (file contents):
Revision 1.3 by root, Wed Apr 17 17:16:48 2013 UTC vs.
Revision 1.4 by root, Wed Apr 17 19:38:25 2013 UTC

34concurrently in the child, using AnyEvent. 34concurrently in the child, using AnyEvent.
35 35
36It also implements an asynchronous event mechanism from the child to the 36It also implements an asynchronous event mechanism from the child to the
37parent, that could be used for progress indications or other information. 37parent, that could be used for progress indications or other information.
38 38
39=head1 EXAMPLES
40
41=head2 Synchronous Backend
42
43Here is a simple example that implements a backend that executes C<unlink>
44and C<rmdir> calls, and reports their status back. It also reports the
45number of requests it has processed every three requests, which is clearly
46silly, but illustrates the use of events.
47
48First the parent process:
49
50 use AnyEvent;
51 use AnyEvent::Fork;
52 use AnyEvent::Fork::RPC;
53
54 my $done = AE::cv;
55
56 my $rpc = AnyEvent::Fork
57 ->new
58 ->require ("MyWorker")
59 ->AnyEvent::Fork::RPC::run ("MyWorker::run",
60 on_event => sub { warn "$_[0] requests handled\n" },
61 on_destroy => $done,
62 );
63
64 for my $id (1..6) {
65 $rpc->(rmdir => "/tmp/somepath/$id", sub {
66 $_[0]
67 or warn "/tmp/somepath/$id: $_[1]\n";
68 });
69 }
70
71 undef $rpc;
72
73 $done->recv;
74
75The parent creates the process, queues a few rmdir's. It then forgets
76about the C<$rpc> object, so that the child exits after it has handled the
77requests, and then it waits till the requests have been handled.
78
79The child is implemented using a separate module, C<MyWorker>, shown here:
80
81 package MyWorker;
82
83 my $count;
84
85 sub run {
86 my ($cmd, $path) = @_;
87
88 AnyEvent::Fork::RPC::event ($count)
89 unless ++$count % 3;
90
91 my $status = $cmd eq "rmdir" ? rmdir $path
92 : $cmd eq "unlink" ? unlink $path
93 : die "fatal error, illegal command '$cmd'";
94
95 $status or (0, "$!")
96 }
97
98 1
99
100The C<run> function first sends a "progress" event every three calls, and
101then executes C<rmdir> or C<unlink>, depending on the first parameter (or
102dies with a fatal error - obviously, you must never let this happen :).
103
104Eventually it returns the status value true if the command was successful,
105or the status value 0 and the stringified error message.
106
107On my system, running the first cdoe fragment with the given
108F<MyWorker.pm> in the current directory yields:
109
110 /tmp/somepath/1: No such file or directory
111 /tmp/somepath/2: No such file or directory
112 3 requests handled
113 /tmp/somepath/3: No such file or directory
114 /tmp/somepath/4: No such file or directory
115 /tmp/somepath/5: No such file or directory
116 6 requests handled
117 /tmp/somepath/6: No such file or directory
118
119Obviously, none of the directories I am trying to delete even exist. Also,
120the events and responses are processed in exactly the same order as
121they were created in the child, which is true for both synchronous and
122asynchronous backends.
123
124Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
125not optional. That is because the function isn't defined when the code is
126compiled. You can make sure it is visible by pre-loading the correct
127backend module in the call to C<require>:
128
129 ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
130
131Since the backend module declares the C<event> function, loading it first
132ensures that perl will correctly interpret calls to it.
133
134And as a final remark, there is a fine module on CPAN that can
135asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
136than this example, namely L<IO::AIO>.
137
39=head1 PARENT PROCESS USAGE 138=head1 PARENT PROCESS USAGE
40 139
41This module exports nothing, and only implements a single function: 140This module exports nothing, and only implements a single function:
42 141
43=over 4 142=over 4
92Called for every call to the C<AnyEvent::Fork::RPC::event> function in the 191Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93child, with the arguments of that function passed to the callback. 192child, with the arguments of that function passed to the callback.
94 193
95Also called on errors when no C<on_error> handler is provided. 194Also called on errors when no C<on_error> handler is provided.
96 195
196=item on_destroy => $cb->()
197
198Called when the C<$rpc> object has been destroyed and all requests have
199been successfully handled. This is useful when you queue some requests and
200want the child to go away after it has handled them. The problem is that
201the parent must not exit either until all requests have been handled, and
202this cna be accomplished by waiting for this callback.
203
97=item init => $function (default none) 204=item init => $function (default none)
98 205
99When specified (by name), this function is called in the child as the very 206When specified (by name), this function is called in the child as the very
100first thing when taking over the process, with all the arguments normally 207first thing when taking over the process, with all the arguments normally
101passed to the C<AnyEvent::Fork::run> function, except the communications 208passed to the C<AnyEvent::Fork::run> function, except the communications
102socket. 209socket.
103 210
104It can be used to do one-time things in the child such as storing passed 211It can be used to do one-time things in the child such as storing passed
105parameters or opening database connections. 212parameters or opening database connections.
106 213
214It is called very early - before the serialisers are created or the
215C<$function> name is resolved into a function reference, so it could be
216used to load any modules that provide the serialiser or function. It can
217not, however, create events.
218
107=item async => $boolean (default: 0) 219=item async => $boolean (default: 0)
108 220
109The default server used in the child does all I/O blockingly, and only 221The default server used in the child does all I/O blockingly, and only
110allows a single RPC call to execute concurrently. 222allows a single RPC call to execute concurrently.
111 223
116the calling semantics of the returned C<$rpc> function. 228the calling semantics of the returned C<$rpc> function.
117 229
118If you want to pre-load the actual back-end modules to enable memory 230If you want to pre-load the actual back-end modules to enable memory
119sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for 231sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
120synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode. 232synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
233
234If you use a template process and want to fork both sync and async
235children, then it is permissible to laod both modules.
121 236
122=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') 237=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
123 238
124All arguments, result data and event data have to be serialised to be 239All arguments, result data and event data have to be serialised to be
125transferred between the processes. For this, they have to be frozen and 240transferred between the processes. For this, they have to be frozen and
142 257
143=cut 258=cut
144 259
145our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; 260our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
146 261
147# ideally, we want (SvLEN - SvCUR) || 1024 or somesuch...
148sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 }
149
150sub run { 262sub run {
151 my ($self, $function, %arg) = @_; 263 my ($self, $function, %arg) = @_;
152 264
153 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER; 265 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
154 my $on_event = delete $arg{on_event}; 266 my $on_event = delete $arg{on_event};
155 my $on_error = delete $arg{on_error}; 267 my $on_error = delete $arg{on_error};
268 my $on_destroy = delete $arg{on_destroy};
156 269
157 # default for on_error is to on_event, if specified 270 # default for on_error is to on_event, if specified
158 $on_error ||= $on_event 271 $on_error ||= $on_event
159 ? sub { $on_event->(error => shift) } 272 ? sub { $on_event->(error => shift) }
160 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; 273 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
162 # default for on_event is to raise an error 275 # default for on_event is to raise an error
163 $on_event ||= sub { $on_error->("event received, but no on_event handler") }; 276 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
164 277
165 my ($f, $t) = eval $serialiser; die $@ if $@; 278 my ($f, $t) = eval $serialiser; die $@ if $@;
166 279
167 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); 280 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rw);
281 my ($rlen, $rbuf) = 512 - 16;
168 282
169 my $wcb = sub { 283 my $wcb = sub {
170 my $len = syswrite $fh, $wbuf; 284 my $len = syswrite $fh, $wbuf;
171 285
172 if (!defined $len) { 286 if (!defined $len) {
189 $self->require ($module) 303 $self->require ($module)
190 ->send_arg ($function, $arg{init}, $serialiser) 304 ->send_arg ($function, $arg{init}, $serialiser)
191 ->run ("$module\::run", sub { 305 ->run ("$module\::run", sub {
192 $fh = shift; 306 $fh = shift;
193 $rw = AE::io $fh, 0, sub { 307 $rw = AE::io $fh, 0, sub {
308 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
194 my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf; 309 my $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
195 310
196 if ($len) { 311 if ($len) {
197 while (5 <= length $rbuf) { 312 while (5 <= length $rbuf) {
198 $len = unpack "L", $rbuf; 313 $len = unpack "L", $rbuf;
199 4 + $len <= length $rbuf 314 4 + $len <= length $rbuf
211 $on_error->("unexpected data from child"); 326 $on_error->("unexpected data from child");
212 } 327 }
213 } 328 }
214 } elsif (defined $len) { 329 } elsif (defined $len) {
215 undef $rw; undef $ww; # it ends here 330 undef $rw; undef $ww; # it ends here
331
332 if (@rcb) {
216 $on_error->("unexpected eof") 333 $on_error->("unexpected eof");
217 if @rcb; 334 } else {
335 $on_destroy->();
336 }
218 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { 337 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
219 undef $rw; undef $ww; # it ends here 338 undef $rw; undef $ww; # it ends here
220 $on_error->("read: $!"); 339 $on_error->("read: $!");
221 } 340 }
222 }; 341 };
237 $wbuf .= pack "L/a*", &$f; 356 $wbuf .= pack "L/a*", &$f;
238 $ww ||= $fh && AE::io $fh, 1, $wcb; 357 $ww ||= $fh && AE::io $fh, 1, $wcb;
239 } 358 }
240} 359}
241 360
361=item $rpc->(..., $cb->(...))
362
363The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
364reference. There are two things you can do with it: call it, and let it go
365out of scope (let it get destroyed).
366
367If C<async> was false when C<$rpc> was created (the default), then, if you
368call C<$rpc>, the C<$function> is invoked with all arguments passed to
369C<$rpc> except the last one (the callback). When the function returns, the
370callback will be invoked with all the return values.
371
372If C<async> was true, then the C<$function> receives an additional
373initial argument, the result callback. In this case, returning from
374C<$function> does nothing - the function only counts as "done" when the
375result callback is called, and any arguments passed to it are considered
376the return values. This makes it possible to "return" from event handlers
377or e.g. Coro threads.
378
379The other thing that can be done with the RPC object is to destroy it. In
380this case, the child process will execute all remaining RPC calls, report
381their results, and then exit.
382
242=back 383=back
243 384
244=head1 CHILD PROCESS USAGE 385=head1 CHILD PROCESS USAGE
245 386
246These functions are not available in this module. They are only available 387The following function is not available in this module. They are only
247in the namespace of this module when the child is running, without 388available in the namespace of this module when the child is running,
248having to load any extra module. They are part of the child-side API of 389without having to load any extra modules. They are part of the child-side
249L<AnyEvent::Fork::RPC>. 390API of L<AnyEvent::Fork::RPC>.
250 391
251=over 4 392=over 4
252 393
253=item AnyEvent::Fork::RPC::event ... 394=item AnyEvent::Fork::RPC::event ...
254 395

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines