ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.2
Committed: Wed Apr 17 17:08:16 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.1: +27 -15 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Fork;
8 use AnyEvent::Fork::RPC;
9
10 my $rpc = AnyEvent::Fork
11 ->new
12 ->require ("MyModule")
13 ->AnyEvent::Fork::RPC::run (
14 "MyModule::server",
15 );
16
17 my $cv = AE::cv;
18
19 $rpc->(1, 2, 3, sub {
20 print "MyModule::server returned @_\n";
21 $cv->send;
22 });
23
24 $cv->recv;
25
26 =head1 DESCRIPTION
27
28 This module implements a simple RPC protocol and backend for processes
29 created via L<AnyEvent::Fork>, allowing you to call a function in the
30 child process and receive its return values (up to 4GB serialised).
31
32 It implements two different backends: a synchronous one that works like a
33 normal function call, and an asynchronous one that can run multiple jobs
34 concurrently in the child, using AnyEvent.
35
36 It also implements an asynchronous event mechanism from the child to the
37 parent, that could be used for progress indications or other information.
38
39 =head1 PARENT PROCESS USAGE
40
41 This module exports nothing, and only implements a single function:
42
43 =over 4
44
45 =cut
46
47 package AnyEvent::Fork::RPC;
48
49 use common::sense;
50
51 use Errno ();
52 use Guard ();
53
54 use AnyEvent;
55 #use AnyEvent::Fork;
56
57 our $VERSION = 0.1;
58
59 =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60
61 The traditional way to call it. But it is way cooler to call it in the
62 following way:
63
64 =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
65
66 This C<run> function/method can be used in place of the
67 L<AnyEvent::Fork::run> method. Just like that method, it takes over
68 the L<AnyEvent::Fork> process, but instead of calling the specified
69 C<$function> directly, it runs a server that accepts RPC calls and handles
70 responses.
71
72 It returns a function reference that can be used to call the function in
73 the child process, handling serialisation and data transfers.
74
75 The following key/value pairs are allowed. It is recommended to have at
76 least an C<on_error> or C<on_event> handler set.
77
78 =over 4
79
80 =item on_error => $cb->($msg)
81
82 Called on (fatal) errors, with a descriptive (hopefully) message. If
83 this callback is not provided, but C<on_event> is, then the C<on_event>
84 callback is called with the first argument being the string C<error>,
85 followed by the error message.
86
87 If neither handler is provided it prints the error to STDERR and will
88 start failing badly.
89
90 =item on_event => $cb->(...)
91
92 Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93 child, with the arguments of that function passed to the callback.
94
95 Also called on errors when no C<on_error> handler is provided.
96
97 =item init => $function (default none)
98
99 When specified (by name), this function is called in the child as the very
100 first thing when taking over the process, with all the arguments normally
101 passed to the C<AnyEvent::Fork::run> function, except the communications
102 socket.
103
104 It can be used to do one-time things in the child such as storing passed
105 parameters or opening database connections.
106
107 =item async => $boolean (default: 0)
108
109 The default server used in the child does all I/O blockingly, and only
110 allows a single RPC call to execute concurrently.
111
112 Setting C<async> to a true value switches to another implementation that
113 uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
114
115 The actual API in the child is documented in the section that describes
116 the calling semantics of the returned C<$rpc> function.
117
118 If you want to pre-load the actual back-end modules to enable memory
119 sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
120 synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
121
122 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
123
124 All arguments, result data and event data have to be serialised to be
125 transferred between the processes. For this, they have to be frozen and
126 thawed in both parent and child processes.
127
128 By default, only octet strings can be passed between the processes, which
129 is reasonably fast and efficient.
130
131 For more complicated use cases, you can provide your own freeze and thaw
132 functions, by specifying a string with perl source code. It's supposed to
133 return two code references when evaluated: the first receives a list of
134 perl values and must return an octet string. The second receives the octet
135 string and must return the original list of values.
136
137 If you need an external module for serialisation, then you can either
138 pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
139 or C<require> statement into the serialiser string. Or both.
140
141 =back
142
143 =cut
144
145 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
146
147 # ideally, we want (SvLEN - SvCUR) || 1024 or somesuch...
148 sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 }
149
150 sub run {
151 my ($self, $function, %arg) = @_;
152
153 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
154 my $on_event = delete $arg{on_event};
155 my $on_error = delete $arg{on_error};
156
157 # default for on_error is to on_event, if specified
158 $on_error ||= $on_event
159 ? sub { $on_event->(error => shift) }
160 : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
161
162 # default for on_event is to raise an error
163 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
164
165 my ($f, $t) = eval $serialiser; die $@ if $@;
166
167 my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw);
168
169 my $wcb = sub {
170 my $len = syswrite $fh, $wbuf;
171
172 if (!defined $len) {
173 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
174 undef $rw; undef $ww; # it ends here
175 $on_error->("$!");
176 }
177 }
178
179 substr $wbuf, 0, $len, "";
180
181 unless (length $wbuf) {
182 undef $ww;
183 $shutdown and shutdown $fh, 1;
184 }
185 };
186
187 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
188
189 $self->require ($module)
190 ->send_arg ($function, $arg{init}, $serialiser)
191 ->run ("$module\::run", sub {
192 $fh = shift;
193 $rw = AE::io $fh, 0, sub {
194 my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf;
195
196 if ($len) {
197 while (5 <= length $rbuf) {
198 $len = unpack "L", $rbuf;
199 4 + $len <= length $rbuf
200 or last;
201
202 my @r = $t->(substr $rbuf, 4, $len);
203 substr $rbuf, 0, $len + 4, "";
204
205 if (pop @r) {
206 $on_event->(@r);
207 } elsif (@rcb) {
208 (shift @rcb)->(@r);
209 } else {
210 undef $rw; undef $ww;
211 $on_error->("unexpected data from child");
212 }
213 }
214 } elsif (defined $len) {
215 undef $rw; undef $ww; # it ends here
216 $on_error->("unexpected eof")
217 if @rcb;
218 } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
219 undef $rw; undef $ww; # it ends here
220 $on_error->("read: $!");
221 }
222 };
223
224 $ww ||= AE::io $fh, 1, $wcb;
225 });
226
227 my $guard = Guard::guard {
228 $shutdown = 1;
229 $ww ||= $fh && AE::io $fh, 1, $wcb;
230 };
231
232 sub {
233 push @rcb, pop;
234
235 $guard; # keep it alive
236
237 $wbuf .= pack "L/a*", &$f;
238 $ww ||= $fh && AE::io $fh, 1, $wcb;
239 }
240 }
241
242 =back
243
244 =head1 CHILD PROCESS USAGE
245
246 These functions are not available in this module. They are only available
247 in the namespace of this module when the child is running, without
248 having to load any extra module. They are part of the child-side API of
249 L<AnyEvent::Fork::RPC>.
250
251 =over 4
252
253 =item AnyEvent::Fork::RPC::quit
254
255 This function can be called to gracefully stop the child process when it
256 is idle.
257
258 After this function is called, the process stops handling incoming RPC
259 requests, but outstanding events and function return values will be sent
260 to the parent. When all data has been sent, the process calls C<exit>.
261
262 Since the parent might not expect the child to exit at random points in
263 time, it is often better to signal the parent by sending an C<event> and
264 letting the parent close down the child process.
265
266 =item AnyEvent::Fork::RPC::event ...
267
268 Send an event to the parent. Events are a bit like RPC calls made by the
269 child process to the parent, except that there is no notion of return
270 values.
271
272 =back
273
274 =head1 SEE ALSO
275
276 L<AnyEvent::Fork> (to create the processes in the first place),
277 L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
278
279 =head1 AUTHOR AND CONTACT INFORMATION
280
281 Marc Lehmann <schmorp@schmorp.de>
282 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
283
284 =cut
285
286 1
287