ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.1
Committed: Wed Apr 17 15:55:59 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork;
8     use AnyEvent::Fork::RPC;
9    
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17     my $cv = AE::cv;
18    
19     $rpc->(1, 2, 3, sub {
20     print "MyModule::server returned @_\n";
21     $cv->send;
22     });
23    
24     $cv->recv;
25    
26     =head1 DESCRIPTION
27    
28     This module implements a simple RPC protocol and backend for processes
29     created via L<AnyEvent::Fork>, allowing you to call a function in the
30     child process and receive its return values (up to 4GB serialised).
31    
32     It implements two different backends: a synchronous one that works like a
33     normal function call, and an asynchronous one that can run multiple jobs
34     concurrently in the child, using AnyEvent.
35    
36     It also implements an asynchronous event mechanism from the child to the
37     parent, that could be used for progress indications or other information.
38    
39     =head1 PARENT PROCESS USAGE
40    
41     This module exports nothing, and only implements a single function:
42    
43     =over 4
44    
45     =cut
46    
47     package AnyEvent::Fork::RPC;
48    
49     use common::sense;
50    
51     use Errno ();
52     use Guard ();
53    
54     use AnyEvent;
55     #use AnyEvent::Fork;
56    
57     our $VERSION = 0.1;
58    
59     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60    
61     The traditional way to call it. But it is way cooler to call it in the
62     following way:
63    
64     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
65    
66     This C<run> function/method can be used in place of the
67     L<AnyEvent::Fork::run> method. Just like that method, it takes over
68     the L<AnyEvent::Fork> process, but instead of calling the specified
69     C<$function> directly, it runs a server that accepts RPC calls and handles
70     responses.
71    
72     It returns a function reference that can be used to call the function in
73     the child process, handling serialisation and data transfers.
74    
75     The following key/value pairs are allowed. It is recommended to have at
76     least an C<on_error> or C<on_event> handler set.
77    
78     =over 4
79    
80     =item on_error => $cb->($msg)
81    
82     Called on (fatal) errors, with a descriptive (hopefully) message. If
83     this callback is not provided, but C<on_event> is, then the C<on_event>
84     callback is called with the first argument being the string C<error>,
85     followed by the error message.
86    
87     If neither handler is provided it prints the error to STDERR and will
88     start failing badly.
89    
90     =item on_event => $cb->(...)
91    
92     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93     child, with the arguments of that function passed to the callback.
94    
95     Also called on errors when no C<on_error> handler is provided.
96    
97     =item init => $function (default none)
98    
99     When specified (by name), this function is called in the child as the very
100     first thing when taking over the process, with all the arguments normally
101     passed to the C<AnyEvent::Fork::run> function, except the communications
102     socket.
103    
104     It can be used to do one-time things in the child such as storing passed
105     parameters or opening database connections.
106    
107     =item async => $boolean (default: 0)
108    
109     The default server used in the child does all I/O blockingly, and only
110     allows a single RPC call to execute concurrently.
111    
112     Setting C<async> to a true value switches to another implementation that
113     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
114    
115     The actual API in the child is documented in the section that describes
116     the calling semantics of the returned C<$rpc> function.
117    
118     =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
119    
120     All arguments, result data and event data have to be serialised to be
121     transferred between the processes. For this, they have to be frozen and
122     thawed in both parent and child processes.
123    
124     By default, only octet strings can be passed between the processes, which
125     is reasonably fast and efficient.
126    
127     For more complicated use cases, you can provide your own freeze and thaw
128     functions, by specifying a string with perl source code. It's supposed to
129     return two code references when evaluated: the first receives a list of
130     perl values and must return an octet string. The second receives the octet
131     string and must return the original list of values.
132    
133     =back
134    
135     =cut
136    
137     our $SERIALISE_STRINGS = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
138    
139     sub run {
140     my ($self, $function, %arg) = @_;
141    
142     my $serialiser = delete $arg{serialiser} || $SERIALISE_STRINGS;
143     my $on_event = delete $arg{on_event};
144     my $on_error = delete $arg{on_error};
145    
146     # default for on_error is to on_event, if specified
147     $on_error ||= $on_event
148     ? sub { $on_event->(error => shift) }
149     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
150    
151     # default for on_event is to raise an error
152     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
153    
154     my ($f, $t) = eval $serialiser; die $@ if $@;
155    
156     my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw);
157    
158     my $wcb = sub {
159     my $len = syswrite $fh, $wbuf;
160    
161     if (!defined $len) {
162     if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
163     undef $rw; undef $ww; # it ends here
164     $on_error->("$!");
165     }
166     }
167    
168     substr $wbuf, 0, $len, "";
169    
170     unless (length $wbuf) {
171     undef $ww;
172     $shutdown and shutdown $fh, 1;
173     }
174     };
175    
176     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
177    
178     $self->require ($module)
179     ->send_arg ($function, $arg{init}, $serialiser)
180     ->run ("$module\::run", sub {
181     $fh = shift;
182     $rw = AE::io $fh, 0, sub {
183     my $len = sysread $fh, $rbuf, 512 + length $rbuf, length $rbuf;
184    
185     if ($len) {
186     while (5 <= length $rbuf) {
187     $len = unpack "L", $rbuf;
188     if (4 + $len <= length $rbuf) {
189     my @r = $t->(substr $rbuf, 4, $len);
190     substr $rbuf, 0, $len + 4, "";
191    
192     if (pop @r) {
193     $on_event->(@r);
194     } elsif (@rcb) {
195     (shift @rcb)->(@r);
196     } else {
197     undef $rw; undef $ww;
198     $on_error->("unexpected data from child");
199     }
200     }
201     }
202     } elsif (defined $len) {
203     undef $rw; undef $ww; # it ends here
204     $on_error->("unexpected eof")
205     if @rcb;
206     } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
207     undef $rw; undef $ww; # it ends here
208     $on_error->("read: $!");
209     }
210     };
211    
212     $ww ||= AE::io $fh, 1, $wcb;
213     });
214    
215     my $guard = Guard::guard {
216     $shutdown = 1;
217     $ww ||= $fh && AE::io $fh, 1, $wcb;
218     };
219    
220     sub {
221     push @rcb, pop;
222    
223     $guard; # keep it alive
224    
225     $wbuf .= pack "L/a*", &$f;
226     $ww ||= $fh && AE::io $fh, 1, $wcb;
227     }
228     }
229    
230     =back
231    
232     =head1 CHILD PROCESS USAGE
233    
234     These functions are not available in this module. They are only available
235     in the namespace of this module when the child is running, without
236     having to load any extra module. They are part of the child-side API of
237     L<AnyEvent::Fork::RPC>.
238    
239     =over 4
240    
241     =item AnyEvent::Fork::RPC::quit
242    
243     This function can be called to gracefully stop the child process when it
244     is idle.
245    
246     After this function is called, the process stops handling incoming RPC
247     requests, but outstanding events and function return values will be sent
248     to the parent. When all data has been sent, the process calls C<exit>.
249    
250     Since the parent might not expect the child to exit at random points in
251     time, it is often better to signal the parent by sending an C<event> and
252     letting the parent close down the child process.
253    
254     =item AnyEvent::Fork::RPC::event ...
255    
256     Send an event to the parent. Events are a bit like RPC calls made by the
257     child process to the parent, except that there is no notion of return
258     values.
259    
260     =back
261    
262     =head1 SEE ALSO
263    
264     L<AnyEvent::Fork> (to create the processes in the first place),
265     L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
266    
267     =head1 AUTHOR AND CONTACT INFORMATION
268    
269     Marc Lehmann <schmorp@schmorp.de>
270     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
271    
272     =cut
273    
274     1
275