ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.1
Committed: Wed Apr 17 15:55:59 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC;
9
10 my $rpc = AnyEvent::Fork
11 ->new
12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server",
15 );
16
17 my $cv = AE::cv;
18
19 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n";
21 $cv->send;
22 });
23
24 $cv->recv;
25
26 =head1 DESCRIPTION
27
28 This module implements a simple RPC protocol and backend for processes
29 created via L<AnyEvent::Fork>, allowing you to call a function in the
30 child process and receive its return values (up to 4GB serialised).
31
32 It implements two different backends: a synchronous one that works like a
33 normal function call, and an asynchronous one that can run multiple jobs
34 concurrently in the child, using AnyEvent.
35
36 It also implements an asynchronous event mechanism from the child to the
37 parent, that could be used for progress indications or other information.
38
39 =head1 PARENT PROCESS USAGE
40
41 This module exports nothing, and only implements a single function:
42
43 =over 4
44
45 =cut
46
47 package AnyEvent::Fork::RPC;
48
49 use common::sense;
50
51 use Errno ();
52 use Guard ();
53
54 use AnyEvent;
55 #use AnyEvent::Fork;
56
57 our $VERSION = 0.1;
58
59 =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60
61 The traditional way to call it. But it is way cooler to call it in the
62 following way:
63
64 =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
65
66 This C<run> function/method can be used in place of the
67 L<AnyEvent::Fork::run> method. Just like that method, it takes over
68 the L<AnyEvent::Fork> process, but instead of calling the specified
69 C<$function> directly, it runs a server that accepts RPC calls and handles
70 responses.
71
72 It returns a function reference that can be used to call the function in
73 the child process, handling serialisation and data transfers.
74
75 The following key/value pairs are allowed. It is recommended to have at
76 least an C<on_error> or C<on_event> handler set.
77
78 =over 4
79
80 =item on_error => $cb->($msg)
81
82 Called on (fatal) errors, with a descriptive (hopefully) message. If
83 this callback is not provided, but C<on_event> is, then the C<on_event>
84 callback is called with the first argument being the string C<error>,
85 followed by the error message.
86
87 If neither handler is provided it prints the error to STDERR and will
88 start failing badly.
89
90 =item on_event => $cb->(...)
91
92 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93 child, with the arguments of that function passed to the callback.
94
95 Also called on errors when no C<on_error> handler is provided.
96
97 =item init => $function (default none)
98
99 When specified (by name), this function is called in the child as the very
100 first thing when taking over the process, with all the arguments normally
101 passed to the C<AnyEvent::Fork::run> function, except the communications
102 socket.
103
104 It can be used to do one-time things in the child such as storing passed
105 parameters or opening database connections.
106
107 =item async => $boolean (default: 0)
108
109 The default server used in the child does all I/O blockingly, and only
110 allows a single RPC call to execute concurrently.
111
112 Setting C<async> to a true value switches to another implementation that
113 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
114
115 The actual API in the child is documented in the section that describes
116 the calling semantics of the returned C<$rpc> function.
117
118 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
119
120 All arguments, result data and event data have to be serialised to be
121 transferred between the processes. For this, they have to be frozen and
122 thawed in both parent and child processes.
123
124 By default, only octet strings can be passed between the processes, which
125 is reasonably fast and efficient.
126
127 For more complicated use cases, you can provide your own freeze and thaw
128 functions, by specifying a string with perl source code. It's supposed to
129 return two code references when evaluated: the first receives a list of
130 perl values and must return an octet string. The second receives the octet
131 string and must return the original list of values.
132
133 =back
134
135 =cut
136
137 our $SERIALISE_STRINGS = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
138
139 sub run {
140 my ($self, $function, %arg) = @_;
141
142 my $serialiser = delete $arg{serialiser} || $SERIALISE_STRINGS;
143 my $on_event = delete $arg{on_event};
144 my $on_error = delete $arg{on_error};
145
146 # default for on_error is to on_event, if specified
147 $on_error ||= $on_event
148 ? sub { $on_event->(error => shift) }
149 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
150
151 # default for on_event is to raise an error
152 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
153
154 my ($f, $t) = eval $serialiser; die $@ if $@;
155
156 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw);
157
158 my $wcb = sub {
159 my $len = syswrite $fh, $wbuf;
160
161 if (!defined $len) {
162 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
163 undef $rw; undef $ww; # it ends here
164 $on_error->("$!");
165 }
166 }
167
168 substr $wbuf, 0, $len, "";
169
170 unless (length $wbuf) {
171 undef $ww;
172 $shutdown and shutdown $fh, 1;
173 }
174 };
175
176 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
177
178 $self->require ($module)
179 ->send_arg ($function, $arg{init}, $serialiser)
180 ->run ("$module\::run", sub {
181 $fh = shift;
182 $rw = AE::io $fh, 0, sub {
183 my $len = sysread $fh, $rbuf, 512 + length $rbuf, length $rbuf;
184
185 if ($len) {
186 while (5 <= length $rbuf) {
187 $len = unpack "L", $rbuf;
188 if (4 + $len <= length $rbuf) {
189 my @r = $t->(substr $rbuf, 4, $len);
190 substr $rbuf, 0, $len + 4, "";
191
192 if (pop @r) {
193 $on_event->(@r);
194 } elsif (@rcb) {
195 (shift @rcb)->(@r);
196 } else {
197 undef $rw; undef $ww;
198 $on_error->("unexpected data from child");
199 }
200 }
201 }
202 } elsif (defined $len) {
203 undef $rw; undef $ww; # it ends here
204 $on_error->("unexpected eof")
205 if @rcb;
206 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
207 undef $rw; undef $ww; # it ends here
208 $on_error->("read: $!");
209 }
210 };
211
212 $ww ||= AE::io $fh, 1, $wcb;
213 });
214
215 my $guard = Guard::guard {
216 $shutdown = 1;
217 $ww ||= $fh && AE::io $fh, 1, $wcb;
218 };
219
220 sub {
221 push @rcb, pop;
222
223 $guard; # keep it alive
224
225 $wbuf .= pack "L/a*", &$f;
226 $ww ||= $fh && AE::io $fh, 1, $wcb;
227 }
228 }
229
230 =back
231
232 =head1 CHILD PROCESS USAGE
233
234 These functions are not available in this module. They are only available
235 in the namespace of this module when the child is running, without
236 having to load any extra module. They are part of the child-side API of
237 L<AnyEvent::Fork::RPC>.
238
239 =over 4
240
241 =item AnyEvent::Fork::RPC::quit
242
243 This function can be called to gracefully stop the child process when it
244 is idle.
245
246 After this function is called, the process stops handling incoming RPC
247 requests, but outstanding events and function return values will be sent
248 to the parent. When all data has been sent, the process calls C<exit>.
249
250 Since the parent might not expect the child to exit at random points in
251 time, it is often better to signal the parent by sending an C<event> and
252 letting the parent close down the child process.
253
254 =item AnyEvent::Fork::RPC::event ...
255
256 Send an event to the parent. Events are a bit like RPC calls made by the
257 child process to the parent, except that there is no notion of return
258 values.
259
260 =back
261
262 =head1 SEE ALSO
263
264 L<AnyEvent::Fork> (to create the processes in the first place),
265 L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
266
267 =head1 AUTHOR AND CONTACT INFORMATION
268
269 Marc Lehmann <schmorp@schmorp.de>
270 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
271
272 =cut
273
274 1
275