ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.2
Committed: Sat Apr 27 23:59:04 2013 UTC (11 years, 5 months ago) by root
Branch: MAIN
Changes since 1.1: +106 -53 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4    
5     THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6    
7     =head1 SYNOPSIS
8    
9     use AnyEvent;
10     use AnyEvent::Fork::Remote;
11    
12     my $rpc = AnyEvent::Fork::Remote
13     ->new
14     ->require ("MyModule")
15     ->run ("MyModule::run", my $cv = AE::cv);
16    
17     my $fh = $cv->recv;
18    
19     =head1 DESCRIPTION
20    
21     Despite what the name of this module might suggest, it doesn't actualyl
22     create remote processes for you. But it does make it easy to use them,
23     once you have started them.
24    
25     This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26     similar enough to require at most minor modifications to support both
27     at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28     L<AnyEvent::Fork::Pool>.
29    
30     The documentation for this module will therefore only document the parts
31     of the API that differ between the two modules.
32    
33     =head2 SUMMARY OF DIFFERENCES
34    
35     Here is a short summary of the main differences between L<AnyEvent::Fork>
36     and this module:
37    
38     =over 4
39    
40     =item * C<send_fh> is not implemented and will fail
41    
42     =item * the child-side C<run> function must read from STDIN and write to STDOUT
43    
44     =item * C<fork> does not actually fork, but will create a new process
45    
46     =back
47    
48     =head1 EXAMPLES
49    
50     =head1 PARENT PROCESS USAGE
51    
52     =over 4
53    
54     =cut
55    
56     package AnyEvent::Fork::Remote;
57    
58     use common::sense;
59    
60 root 1.2 use Carp ();
61 root 1.1 use Errno ();
62    
63     use AnyEvent ();
64     use AnyEvent::Util ();
65    
66     our $VERSION = 0.1;
67    
68     # xored together must start and and with \n
69     my $magic0 = "Pdk{6y[_zZ";
70     my $magic1 = "Z^yZ7~i=oP";
71    
72     =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
73    
74     Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
75     processes are only created when C<run> is called, every other method call
76     is is simply recorded until then.
77    
78     Each time a new process is needed, it executes C<$path> with the given
79     arguments (the first array member must be the program name, as with
80     the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
81     and C<STDOUT> connected to a communications socket. No input must be
82     consumed by the comamnd before F<perl> is started, and no output should be
83     generated.
84    
85     The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
86     without specifying anything to execute (no script file name, no C<-e>
87     switch etc.).
88    
89     Here are some examples to give you an idea:
90    
91     # just "perl"
92     $proc = new_exec AnyEvent::Fork::Remote
93     "/usr/bin/perl", "perl";
94    
95     # rsh othernode exec perl
96     $proc = new_exec AnyEvent::Fork::Remote
97     "/usr/bin/rsh", "rsh", "othernode", "exec perl";
98    
99     # a complicated ssh command
100     $proc = new_exec AnyEvent::Fork::Remote
101     "/usr/bin/ssh",
102     qw(ssh -q
103     -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
104     -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
105     otherhost
106     exec perl);
107    
108     =item my $proc = new AnyEvent::Fork::Remote $create_callback
109    
110     Basically the same as C<new_exec>, but instead of a hardcoded command
111     path, it expects a callback which is invoked each time a process needs to
112     be created.
113    
114     The C<$create_callback> is called with another callback as argument,
115     and should call this callback with the file handle that is connected
116     to a F<perl> process. This callback can be invoked even after the
117     C<$create_callback> returns.
118    
119     Example: emulate C<new_exec> using C<new>.
120    
121     use AnyEvent::Util;
122     use Proc::FastSpawn;
123    
124     $proc = new AnyEvent::Fork::Remote sub {
125     my $done = shift;
126    
127     my ($a, $b) = AnyEvent::Util::portable_socketpair
128     or die;
129    
130     open my $oldin , "<&0" or die;
131     open my $oldout, ">&1" or die;
132    
133     open STDIN , "<&" . fileno $b or die;
134     open STDOUT, ">&" . fileno $b or die;
135    
136     spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
137    
138     open STDIN , "<&" . fileno $oldin ;
139     open STDOUT, ">&" . fileno $oldout;
140    
141     $done->($a);
142     };
143    
144 root 1.2 =item my $proc = new_from_fh $fh
145    
146     Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
147     handle must be connected to both STDIN and STDOUT of a F<perl> process.
148    
149     This form might be more convenient than C<new> or C<new_exec> when
150     creating an C<AnyEvent::Fork::Remote> object, but the resulting object
151     does not support C<fork>.
152    
153 root 1.1 =cut
154    
155 root 1.2 sub new {
156     my ($class, $create) = @_;
157    
158     bless [
159     $create,
160     "",
161     [],
162     ], $class
163     }
164    
165     sub new_from_fh {
166     my ($class, @fh) = @_;
167    
168     $class->new (sub {
169     shift @fh
170     or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
171     });
172     }
173    
174 root 1.1 sub new_exec {
175     my ($class, $program, @argv) = @_;
176    
177     require AnyEvent::Util;
178     require Proc::FastSpawn;
179    
180     $class->new (sub {
181     my $done = shift;
182    
183     my ($a, $b) = AnyEvent::Util::portable_socketpair ()
184     or die;
185    
186     open my $oldin , "<&0" or die;
187     open my $oldout, ">&1" or die;
188    
189     open STDIN , "<&" . fileno $b or die;
190     open STDOUT, ">&" . fileno $b or die;
191    
192     Proc::FastSpawn::spawn ($program, \@argv);
193    
194     open STDIN , "<&" . fileno $oldin ;
195     open STDOUT, ">&" . fileno $oldout;
196    
197     $done->($a);
198     })
199     }
200    
201     =item $new_proc = $proc->fork
202    
203     Quite the same as the same method of L<AnyEvent::Fork>, except that it
204     simply clones the object without creating an actual process.
205    
206     =cut
207    
208     sub fork {
209     my $self = shift;
210    
211     bless [
212     $self->[0],
213     $self->[1],
214     [@{ $self->[2] }],
215     ], ref $self
216     }
217    
218     =item undef = $proc->pid
219    
220     The C<pid> method always returns C<undef> and only exists for
221     compatibility with L<AnyEvent::Fork>.
222    
223     =cut
224    
225     sub pid {
226     undef
227     }
228    
229     =item $proc = $proc->send_fh (...)
230    
231     Not supported and always croaks.
232    
233     =cut
234    
235     sub send_fh {
236 root 1.2 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
237 root 1.1 }
238    
239     =item $proc = $proc->eval ($perlcode, @args)
240    
241     Quite the same as the same method of L<AnyEvent::Fork>.
242    
243     =cut
244    
245     # quote a binary string as a perl scalar
246     sub sq($) {
247     my $s = shift;
248    
249     $s =~ /'/
250     or return "'$s'";
251    
252     $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
253     "q\x10$s\x10"
254     }
255    
256     # quote a list of strings
257     sub aq(@) {
258     "(" . (join ",", map sq $_, @_) . ")"
259     }
260    
261     sub eval {
262     my ($self, $perlcode, @args) = @_;
263    
264 root 1.2 my $linecode = $perlcode;
265     $linecode =~ s/\s+/ /g; # takes care of \n
266     $linecode =~ s/"/''/g;
267     substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
268    
269     $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
270 root 1.1 }
271    
272     =item $proc = $proc->require ($module, ...)
273    
274     Quite the same as the same method of L<AnyEvent::Fork>.
275    
276     =cut
277    
278     sub require {
279     my ($self, @modules) = @_;
280    
281 root 1.2 $self->eval ("require $_")
282     for @modules;
283 root 1.1
284     $self
285     }
286    
287     =item $proc = $proc->send_arg ($string, ...)
288    
289     Quite the same as the same method of L<AnyEvent::Fork>.
290    
291     =cut
292    
293     sub send_arg {
294     my ($self, @arg) = @_;
295    
296     push @{ $self->[2] }, @arg;
297    
298     $self
299     }
300    
301     =item $proc->run ($func, $cb->($fh))
302    
303     Very similar to the run method of L<AnyEvent::Fork>.
304    
305 root 1.2 On the parent side, the API is identical, except that a C<$cb> argument of
306     C<undef> instad of a valid file handle signals an error.
307    
308     On the child side, the "communications socket" is in fact just C<*STDIN>,
309     and typically can only be read from (this highly depends on how the
310     program is created - if you just run F<perl> locally, it will work for
311     both reading and writing, but commands such as F<rsh> or F<ssh> typically
312     only provide read-only handles for STDIN).
313    
314     To be portable, if the run function wants to read data that is written to
315     C<$fh> in the parent, then it should read from STDIN. If the run function
316     wants to provide data that can later be read from C<$fh>, then it should
317     write them to STDOUT.
318    
319     You can write a run function that works with both L<AnyEvent::Fork>
320     and this module by checking C<fileno $fh>. If it is C<0> (meaning
321     it is STDIN), then you should use it for reading, and STDOUT for
322     writing. Otherwise, you should use the file handle for both:
323 root 1.1
324     sub run {
325     my ($rfh, ...) = @_;
326     my $wfh = fileno $rfh ? $rfh : *STDOUT;
327    
328     # now use $rfh for reading and $wfh for writing
329     }
330    
331     =cut
332    
333     sub run {
334     my ($self, $func, $cb) = @_;
335    
336     $self->[0](sub {
337     my $fh = shift
338     or die "AnyEvent::Fork::Remote: create callback failed";
339    
340 root 1.2 my $code = 'BEGIN {' . $self->[1] . "}\n"
341     . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
342     . '{ sysread STDIN, my $dummy, 1 }'
343     . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
344     . "\n__END__\n";
345    
346     warn $code;#d#
347    
348     AnyEvent::Util::fh_nonblocking $fh, 1;
349    
350     my ($rw, $ww);
351    
352     my $ofs;
353    
354     $ww = AE::io $fh, 1, sub {
355     my $len = syswrite $fh, $code, 1<<20, $ofs;
356    
357     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
358     $ofs += $len;
359     undef $ww if $ofs >= length $code;
360     } else {
361     # error
362     ($ww, $rw) = (); $cb->(undef);
363     }
364     };
365    
366     my $rbuf;
367    
368     $rw = AE::io $fh, 0, sub {
369     my $len = sysread $fh, $rbuf, 1<<10;
370    
371     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
372     $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
373    
374     if ($rbuf eq ($magic0 ^ $magic1)) {
375     # all data was sent, magic was received - both
376     # directions should be "empty", and therefore
377     # the socket must accept at least a single octet,
378     # to signal the "child" to go on.
379     undef $rw;
380     die if $ww; # uh-oh
381    
382     syswrite $fh, "\n";
383     $cb->($fh);
384     }
385     } else {
386     # error
387     ($ww, $rw) = (); $cb->(undef);
388     }
389     };
390 root 1.1 });
391     }
392    
393     =back
394    
395     =head1 SEE ALSO
396    
397     L<AnyEvent::Fork>, the same as this module, for local processes.
398    
399     L<AnyEvent::Fork::RPC>, to talk to the created processes.
400    
401     L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
402    
403     =head1 AUTHOR AND CONTACT INFORMATION
404    
405     Marc Lehmann <schmorp@schmorp.de>
406     http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
407    
408     =cut
409    
410     1
411