ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-RPC/RPC.pm
Revision: 1.3
Committed: Wed Apr 17 17:16:48 2013 UTC (11 years, 1 month ago) by root
Branch: MAIN
Changes since 1.2: +0 -13 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Fork;
8     use AnyEvent::Fork::RPC;
9    
10     my $rpc = AnyEvent::Fork
11     ->new
12     ->require ("MyModule")
13     ->AnyEvent::Fork::RPC::run (
14     "MyModule::server",
15     );
16    
17     my $cv = AE::cv;
18    
19     $rpc->(1, 2, 3, sub {
20     print "MyModule::server returned @_\n";
21     $cv->send;
22     });
23    
24     $cv->recv;
25    
26     =head1 DESCRIPTION
27    
28     This module implements a simple RPC protocol and backend for processes
29     created via L<AnyEvent::Fork>, allowing you to call a function in the
30     child process and receive its return values (up to 4GB serialised).
31    
32     It implements two different backends: a synchronous one that works like a
33     normal function call, and an asynchronous one that can run multiple jobs
34     concurrently in the child, using AnyEvent.
35    
36     It also implements an asynchronous event mechanism from the child to the
37     parent, that could be used for progress indications or other information.
38    
39     =head1 PARENT PROCESS USAGE
40    
41     This module exports nothing, and only implements a single function:
42    
43     =over 4
44    
45     =cut
46    
47     package AnyEvent::Fork::RPC;
48    
49     use common::sense;
50    
51     use Errno ();
52     use Guard ();
53    
54     use AnyEvent;
55     #use AnyEvent::Fork;
56    
57     our $VERSION = 0.1;
58    
59     =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
60    
61     The traditional way to call it. But it is way cooler to call it in the
62     following way:
63    
64     =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
65    
66     This C<run> function/method can be used in place of the
67     L<AnyEvent::Fork::run> method. Just like that method, it takes over
68     the L<AnyEvent::Fork> process, but instead of calling the specified
69     C<$function> directly, it runs a server that accepts RPC calls and handles
70     responses.
71    
72     It returns a function reference that can be used to call the function in
73     the child process, handling serialisation and data transfers.
74    
75     The following key/value pairs are allowed. It is recommended to have at
76     least an C<on_error> or C<on_event> handler set.
77    
78     =over 4
79    
80     =item on_error => $cb->($msg)
81    
82     Called on (fatal) errors, with a descriptive (hopefully) message. If
83     this callback is not provided, but C<on_event> is, then the C<on_event>
84     callback is called with the first argument being the string C<error>,
85     followed by the error message.
86    
87     If neither handler is provided it prints the error to STDERR and will
88     start failing badly.
89    
90     =item on_event => $cb->(...)
91    
92     Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
93     child, with the arguments of that function passed to the callback.
94    
95     Also called on errors when no C<on_error> handler is provided.
96    
97     =item init => $function (default none)
98    
99     When specified (by name), this function is called in the child as the very
100     first thing when taking over the process, with all the arguments normally
101     passed to the C<AnyEvent::Fork::run> function, except the communications
102     socket.
103    
104     It can be used to do one-time things in the child such as storing passed
105     parameters or opening database connections.
106    
107     =item async => $boolean (default: 0)
108    
109     The default server used in the child does all I/O blockingly, and only
110     allows a single RPC call to execute concurrently.
111    
112     Setting C<async> to a true value switches to another implementation that
113     uses L<AnyEvent> in the child and allows multiple concurrent RPC calls.
114    
115     The actual API in the child is documented in the section that describes
116     the calling semantics of the returned C<$rpc> function.
117    
118 root 1.2 If you want to pre-load the actual back-end modules to enable memory
119     sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
120     synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
121    
122 root 1.1 =item serialiser => $string (default: '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })')
123    
124     All arguments, result data and event data have to be serialised to be
125     transferred between the processes. For this, they have to be frozen and
126     thawed in both parent and child processes.
127    
128     By default, only octet strings can be passed between the processes, which
129     is reasonably fast and efficient.
130    
131     For more complicated use cases, you can provide your own freeze and thaw
132     functions, by specifying a string with perl source code. It's supposed to
133     return two code references when evaluated: the first receives a list of
134     perl values and must return an octet string. The second receives the octet
135     string and must return the original list of values.
136    
137 root 1.2 If you need an external module for serialisation, then you can either
138     pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
139     or C<require> statement into the serialiser string. Or both.
140    
141 root 1.1 =back
142    
143     =cut
144    
145 root 1.2 our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
146    
147     # ideally, we want (SvLEN - SvCUR) || 1024 or somesuch...
148     sub rlen($) { ($_[0] < 384 ? 512 + 16 : 2 << int +(log $_[0] + 512) / log 2) - $_[0] - 16 }
149 root 1.1
150     sub run {
151     my ($self, $function, %arg) = @_;
152    
153 root 1.2 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
154 root 1.1 my $on_event = delete $arg{on_event};
155     my $on_error = delete $arg{on_error};
156    
157     # default for on_error is to on_event, if specified
158     $on_error ||= $on_event
159     ? sub { $on_event->(error => shift) }
160     : sub { die "AnyEvent::Fork::RPC: uncaught error: $_[0].\n" };
161    
162     # default for on_event is to raise an error
163     $on_event ||= sub { $on_error->("event received, but no on_event handler") };
164    
165     my ($f, $t) = eval $serialiser; die $@ if $@;
166    
167     my (@rcb, $fh, $shutdown, $wbuf, $ww, $rbuf, $rw);
168    
169     my $wcb = sub {
170     my $len = syswrite $fh, $wbuf;
171    
172     if (!defined $len) {
173     if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
174     undef $rw; undef $ww; # it ends here
175     $on_error->("$!");
176     }
177     }
178    
179     substr $wbuf, 0, $len, "";
180    
181     unless (length $wbuf) {
182     undef $ww;
183     $shutdown and shutdown $fh, 1;
184     }
185     };
186    
187     my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
188    
189     $self->require ($module)
190     ->send_arg ($function, $arg{init}, $serialiser)
191     ->run ("$module\::run", sub {
192     $fh = shift;
193     $rw = AE::io $fh, 0, sub {
194 root 1.2 my $len = sysread $fh, $rbuf, rlen length $rbuf, length $rbuf;
195 root 1.1
196     if ($len) {
197     while (5 <= length $rbuf) {
198     $len = unpack "L", $rbuf;
199 root 1.2 4 + $len <= length $rbuf
200     or last;
201    
202     my @r = $t->(substr $rbuf, 4, $len);
203     substr $rbuf, 0, $len + 4, "";
204    
205     if (pop @r) {
206     $on_event->(@r);
207     } elsif (@rcb) {
208     (shift @rcb)->(@r);
209     } else {
210     undef $rw; undef $ww;
211     $on_error->("unexpected data from child");
212 root 1.1 }
213     }
214     } elsif (defined $len) {
215     undef $rw; undef $ww; # it ends here
216     $on_error->("unexpected eof")
217     if @rcb;
218     } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
219     undef $rw; undef $ww; # it ends here
220     $on_error->("read: $!");
221     }
222     };
223    
224     $ww ||= AE::io $fh, 1, $wcb;
225     });
226    
227     my $guard = Guard::guard {
228     $shutdown = 1;
229     $ww ||= $fh && AE::io $fh, 1, $wcb;
230     };
231    
232     sub {
233     push @rcb, pop;
234    
235     $guard; # keep it alive
236    
237     $wbuf .= pack "L/a*", &$f;
238     $ww ||= $fh && AE::io $fh, 1, $wcb;
239     }
240     }
241    
242     =back
243    
244     =head1 CHILD PROCESS USAGE
245    
246     These functions are not available in this module. They are only available
247     in the namespace of this module when the child is running, without
248     having to load any extra module. They are part of the child-side API of
249     L<AnyEvent::Fork::RPC>.
250    
251     =over 4
252    
253     =item AnyEvent::Fork::RPC::event ...
254    
255     Send an event to the parent. Events are a bit like RPC calls made by the
256     child process to the parent, except that there is no notion of return
257     values.
258    
259     =back
260    
261     =head1 SEE ALSO
262    
263     L<AnyEvent::Fork> (to create the processes in the first place),
264     L<AnyEvent::Fork::Pool> (to manage whole pools of processes).
265    
266     =head1 AUTHOR AND CONTACT INFORMATION
267    
268     Marc Lehmann <schmorp@schmorp.de>
269     http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
270    
271     =cut
272    
273     1
274