1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent::Fork; |
8 |
|
|
use AnyEvent::Fork::RPC; |
9 |
|
|
|
10 |
|
|
my $rpc = AnyEvent::Fork |
11 |
|
|
->new |
12 |
|
|
->require ("MyModule") |
13 |
|
|
->AnyEvent::Fork::RPC::run ( |
14 |
|
|
"MyModule::server", |
15 |
|
|
); |
16 |
|
|
|
17 |
|
|
my $cv = AE::cv; |
18 |
|
|
|
19 |
|
|
$rpc->(1, 2, 3, sub { |
20 |
|
|
print "MyModule::server returned @_\n"; |
21 |
|
|
$cv->send; |
22 |
|
|
}); |
23 |
|
|
|
24 |
|
|
$cv->recv; |
25 |
|
|
|
26 |
|
|
=head1 DESCRIPTION |
27 |
|
|
|
28 |
|
|
This module implements a simple RPC protocol and backend for processes |
29 |
|
|
created via L<AnyEvent::Fork>, allowing you to call a function in the |
30 |
|
|
child process and receive its return values (up to 4GB serialised). |
31 |
|
|
|
32 |
|
|
It implements two different backends: a synchronous one that works like a |
33 |
|
|
normal function call, and an asynchronous one that can run multiple jobs |
34 |
|
|
concurrently in the child, using AnyEvent. |
35 |
|
|
|
36 |
|
|
It also implements an asynchronous event mechanism from the child to the |
37 |
|
|
parent, that could be used for progress indications or other information. |
38 |
|
|
|
39 |
|
|
=head1 PARENT PROCESS USAGE |
40 |
|
|
|
41 |
|
|
This module exports nothing, and only implements a single function: |
42 |
|
|
|
43 |
|
|
=over 4 |
44 |
|
|
|
45 |
|
|
=cut |
46 |
|
|
|
47 |
|
|
package AnyEvent::Fork::RPC; |
48 |
|
|
|
49 |
|
|
use common::sense; |
50 |
|
|
|
51 |
|
|
use Errno (); |
52 |
|
|
use Guard (); |
53 |
|
|
|
54 |
|
|
use AnyEvent; |
55 |
|
|
#use AnyEvent::Fork; |
56 |
|
|
|
57 |
|
|
our $VERSION = 0.1; |
58 |
|
|
|
59 |
|
|
=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...] |
60 |
|
|
|
61 |
|
|
The traditional way to call it. But it is way cooler to call it in the |
62 |
|
|
following way: |
63 |
|
|
|
64 |
|
|
=item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...]) |
65 |
|
|
|
66 |
|
|
This C<run> function/method can be used in place of the |
67 |
|
|
L<AnyEvent::Fork::run> method. Just like that method, it takes over |
68 |
|
|
the L<AnyEvent::Fork> process, but instead of calling the specified |
69 |
|
|
C<$function> directly, it runs a server that accepts RPC calls and handles |
70 |
|
|
responses. |
71 |
|
|
|
72 |
|
|
It returns a function reference that can be used to call the function in |
73 |
|
|
the child process, handling serialisation and data transfers. |
74 |
|
|
|
75 |
|
|
The following key/value pairs are allowed. It is recommended to have at |
76 |
|
|
least an C<on_error> or C<on_event> handler set. |
77 |
|
|
|
78 |
|
|
=over 4 |
79 |
|
|
|
80 |
|
|
=item on_error => $cb->($msg) |
81 |
|
|
|
82 |
|
|
Called on (fatal) errors, with a descriptive (hopefully) message. If |
83 |
|
|
this callback is not provided, but C<on_event> is, then the C<on_event> |
84 |
|
|
callback is called with the first argument being the string C<error>, |
85 |
|
|
followed by the error message. |
86 |
|
|
|
87 |
|
|
If neither handler is provided it prints the error to STDERR and will |
88 |
|
|
start failing badly. |
89 |
|
|
|
90 |
|
|
=item on_event => $cb->(...) |
91 |
|
|
|
92 |
|
|
Called for every call to the C<AnyEvent::Fork::RPC::event> function in the |
93 |
|
|
child, with the arguments of that function passed to the callback. |
94 |
|
|
|
95 |
|
|
Also called on errors when no C<on_error> handler is provided. |
96 |
|
|
|
97 |
|
|
=item init => $function (default none) |
98 |
|
|
|
99 |
|
|
When specified (by name), this function is called in the child as the very |
100 |
|
|
first thing when taking over the process, with all the arguments normally |
101 |
|
|
passed to the C<AnyEvent::Fork::run> function, except the communications |
102 |
|
|
socket. |
103 |
|
|
|
104 |
|
|
It can be used to do one-time things in the child such as storing passed |
105 |
|
|
parameters or opening database connections. |
106 |
|
|
|
107 |
|
|
=item async => $boolean (default: 0) |
108 |
|
|
|
109 |
|
|
The default server used in the child does all I/O blockingly, and only |
110 |
|
|
allows a single RPC call to execute concurrently. |
111 |
|
|
|
112 |
|
|
Setting C<async> to a true value switches to another implementation that |
113 |
|
|
uses L<AnyEvent> in the child and allows multiple concurrent RPC calls. |
114 |
|
|
|
115 |
|
|
The actual API in the child is documented in the section that describes |
116 |
|
|
the calling semantics of the returned C<$rpc> function. |
117 |
|
|
|
118 |
|
|
=item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })') |
119 |
|
|
|
120 |
|
|
All arguments, result data and event data have to be serialised to be |
121 |
|
|
transferred between the processes. For this, they have to be frozen and |
122 |
|
|
thawed in both parent and child processes. |
123 |
|
|
|
124 |
|
|
By default, only octet strings can be passed between the processes, which |
125 |
|
|
is reasonably fast and efficient. |
126 |
|
|
|
127 |
|
|
For more complicated use cases, you can provide your own freeze and thaw |
128 |
|
|
functions, by specifying a string with perl source code. It's supposed to |
129 |
|
|
return two code references when evaluated: the first receives a list of |
130 |
|
|
perl values and must return an octet string. The second receives the octet |
131 |
|
|
string and must return the original list of values. |
132 |
|
|
|
133 |
|
|
=back |
134 |
|
|
|
135 |
|
|
=cut |
136 |
|
|
|
137 |
|
|
our $SERIALISE_STRINGS = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })'; |
138 |
|
|
|
139 |
|
|
sub run { |
140 |
|
|
my ($self, $function, %arg) = @_; |
141 |
|
|
|
142 |
|
|
my $serialiser = delete $arg{serialiser} || $SERIALISE_STRINGS; |
143 |
|
|
my $on_event = delete $arg{on_event}; |
144 |
|
|
my $on_error = delete $arg{on_error}; |
145 |
|
|
|
146 |
|
|
# default for on_error is to on_event, if specified |
147 |
|
|
$on_error ||= $on_event |
148 |
|
|
? sub { $on_event->(error => shift) } |
149 |
|
|
: sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" }; |
150 |
|
|
|
151 |
|
|
# default for on_event is to raise an error |
152 |
|
|
$on_event ||= sub { $on_error->("event received, but no on_event handler") }; |
153 |
|
|
|
154 |
|
|
my ($f, $t) = eval $serialiser; die $@ if $@; |
155 |
|
|
|
156 |
|
|
my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw); |
157 |
|
|
|
158 |
|
|
my $wcb = sub { |
159 |
|
|
my $len = syswrite $fh, $wbuf; |
160 |
|
|
|
161 |
|
|
if (!defined $len) { |
162 |
|
|
if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
163 |
|
|
undef $rw; undef $ww; # it ends here |
164 |
|
|
$on_error->("$!"); |
165 |
|
|
} |
166 |
|
|
} |
167 |
|
|
|
168 |
|
|
substr $wbuf, 0, $len, ""; |
169 |
|
|
|
170 |
|
|
unless (length $wbuf) { |
171 |
|
|
undef $ww; |
172 |
|
|
$shutdown and shutdown $fh, 1; |
173 |
|
|
} |
174 |
|
|
}; |
175 |
|
|
|
176 |
|
|
my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"); |
177 |
|
|
|
178 |
|
|
$self->require ($module) |
179 |
|
|
->send_arg ($function, $arg{init}, $serialiser) |
180 |
|
|
->run ("$module\::run", sub { |
181 |
|
|
$fh = shift; |
182 |
|
|
$rw = AE::io $fh, 0, sub { |
183 |
|
|
my $len = sysread $fh, $rbuf, 512 + length $rbuf, length $rbuf; |
184 |
|
|
|
185 |
|
|
if ($len) { |
186 |
|
|
while (5 <= length $rbuf) { |
187 |
|
|
$len = unpack "L", $rbuf; |
188 |
|
|
if (4 + $len <= length $rbuf) { |
189 |
|
|
my @r = $t->(substr $rbuf, 4, $len); |
190 |
|
|
substr $rbuf, 0, $len + 4, ""; |
191 |
|
|
|
192 |
|
|
if (pop @r) { |
193 |
|
|
$on_event->(@r); |
194 |
|
|
} elsif (@rcb) { |
195 |
|
|
(shift @rcb)->(@r); |
196 |
|
|
} else { |
197 |
|
|
undef $rw; undef $ww; |
198 |
|
|
$on_error->("unexpected data from child"); |
199 |
|
|
} |
200 |
|
|
} |
201 |
|
|
} |
202 |
|
|
} elsif (defined $len) { |
203 |
|
|
undef $rw; undef $ww; # it ends here |
204 |
|
|
$on_error->("unexpected eof") |
205 |
|
|
if @rcb; |
206 |
|
|
} elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) { |
207 |
|
|
undef $rw; undef $ww; # it ends here |
208 |
|
|
$on_error->("read: $!"); |
209 |
|
|
} |
210 |
|
|
}; |
211 |
|
|
|
212 |
|
|
$ww ||= AE::io $fh, 1, $wcb; |
213 |
|
|
}); |
214 |
|
|
|
215 |
|
|
my $guard = Guard::guard { |
216 |
|
|
$shutdown = 1; |
217 |
|
|
$ww ||= $fh && AE::io $fh, 1, $wcb; |
218 |
|
|
}; |
219 |
|
|
|
220 |
|
|
sub { |
221 |
|
|
push @rcb, pop; |
222 |
|
|
|
223 |
|
|
$guard; # keep it alive |
224 |
|
|
|
225 |
|
|
$wbuf .= pack "L/a*", &$f; |
226 |
|
|
$ww ||= $fh && AE::io $fh, 1, $wcb; |
227 |
|
|
} |
228 |
|
|
} |
229 |
|
|
|
230 |
|
|
=back |
231 |
|
|
|
232 |
|
|
=head1 CHILD PROCESS USAGE |
233 |
|
|
|
234 |
|
|
These functions are not available in this module. They are only available |
235 |
|
|
in the namespace of this module when the child is running, without |
236 |
|
|
having to load any extra module. They are part of the child-side API of |
237 |
|
|
L<AnyEvent::Fork::RPC>. |
238 |
|
|
|
239 |
|
|
=over 4 |
240 |
|
|
|
241 |
|
|
=item AnyEvent::Fork::RPC::quit |
242 |
|
|
|
243 |
|
|
This function can be called to gracefully stop the child process when it |
244 |
|
|
is idle. |
245 |
|
|
|
246 |
|
|
After this function is called, the process stops handling incoming RPC |
247 |
|
|
requests, but outstanding events and function return values will be sent |
248 |
|
|
to the parent. When all data has been sent, the process calls C<exit>. |
249 |
|
|
|
250 |
|
|
Since the parent might not expect the child to exit at random points in |
251 |
|
|
time, it is often better to signal the parent by sending an C<event> and |
252 |
|
|
letting the parent close down the child process. |
253 |
|
|
|
254 |
|
|
=item AnyEvent::Fork::RPC::event ... |
255 |
|
|
|
256 |
|
|
Send an event to the parent. Events are a bit like RPC calls made by the |
257 |
|
|
child process to the parent, except that there is no notion of return |
258 |
|
|
values. |
259 |
|
|
|
260 |
|
|
=back |
261 |
|
|
|
262 |
|
|
=head1 SEE ALSO |
263 |
|
|
|
264 |
|
|
L<AnyEvent::Fork> (to create the processes in the first place), |
265 |
|
|
L<AnyEvent::Fork::Pool> (to manage whole pools of processes). |
266 |
|
|
|
267 |
|
|
=head1 AUTHOR AND CONTACT INFORMATION |
268 |
|
|
|
269 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
270 |
|
|
http://software.schmorp.de/pkg/AnyEvent-Fork-RPC |
271 |
|
|
|
272 |
|
|
=cut |
273 |
|
|
|
274 |
|
|
1 |
275 |
|
|
|