ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.4
Committed: Sun Apr 28 03:46:34 2013 UTC (11 years, 4 months ago) by root
Branch: MAIN
Changes since 1.3: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4    
5     THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6    
7     =head1 SYNOPSIS
8    
9     use AnyEvent;
10     use AnyEvent::Fork::Remote;
11    
12     my $rpc = AnyEvent::Fork::Remote
13     ->new
14     ->require ("MyModule")
15     ->run ("MyModule::run", my $cv = AE::cv);
16    
17     my $fh = $cv->recv;
18    
19     =head1 DESCRIPTION
20    
21 root 1.3 Despite what the name of this module might suggest, it doesn't actually
22 root 1.1 create remote processes for you. But it does make it easy to use them,
23     once you have started them.
24    
25     This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26     similar enough to require at most minor modifications to support both
27     at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28     L<AnyEvent::Fork::Pool>.
29    
30     The documentation for this module will therefore only document the parts
31     of the API that differ between the two modules.
32    
33     =head2 SUMMARY OF DIFFERENCES
34    
35     Here is a short summary of the main differences between L<AnyEvent::Fork>
36     and this module:
37    
38     =over 4
39    
40     =item * C<send_fh> is not implemented and will fail
41    
42     =item * the child-side C<run> function must read from STDIN and write to STDOUT
43    
44     =item * C<fork> does not actually fork, but will create a new process
45    
46     =back
47    
48     =head1 EXAMPLES
49    
50     =head1 PARENT PROCESS USAGE
51    
52     =over 4
53    
54     =cut
55    
56     package AnyEvent::Fork::Remote;
57    
58     use common::sense;
59    
60 root 1.2 use Carp ();
61 root 1.1 use Errno ();
62    
63     use AnyEvent ();
64     use AnyEvent::Util ();
65    
66     our $VERSION = 0.1;
67    
68     # xored together must start and and with \n
69     my $magic0 = "Pdk{6y[_zZ";
70     my $magic1 = "Z^yZ7~i=oP";
71    
72     =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
73    
74     Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
75     processes are only created when C<run> is called, every other method call
76     is is simply recorded until then.
77    
78     Each time a new process is needed, it executes C<$path> with the given
79     arguments (the first array member must be the program name, as with
80     the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
81     and C<STDOUT> connected to a communications socket. No input must be
82     consumed by the comamnd before F<perl> is started, and no output should be
83     generated.
84    
85     The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
86     without specifying anything to execute (no script file name, no C<-e>
87     switch etc.).
88    
89     Here are some examples to give you an idea:
90    
91     # just "perl"
92     $proc = new_exec AnyEvent::Fork::Remote
93     "/usr/bin/perl", "perl";
94    
95     # rsh othernode exec perl
96     $proc = new_exec AnyEvent::Fork::Remote
97     "/usr/bin/rsh", "rsh", "othernode", "exec perl";
98    
99     # a complicated ssh command
100     $proc = new_exec AnyEvent::Fork::Remote
101     "/usr/bin/ssh",
102     qw(ssh -q
103     -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
104     -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
105     otherhost
106     exec perl);
107    
108 root 1.3 =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
109    
110     Just like C<new_exec>, except that the program is searched in the
111     C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
112     to find e.g. C<ssh>:
113    
114     $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
115    
116 root 1.1 =item my $proc = new AnyEvent::Fork::Remote $create_callback
117    
118     Basically the same as C<new_exec>, but instead of a hardcoded command
119     path, it expects a callback which is invoked each time a process needs to
120     be created.
121    
122     The C<$create_callback> is called with another callback as argument,
123     and should call this callback with the file handle that is connected
124     to a F<perl> process. This callback can be invoked even after the
125     C<$create_callback> returns.
126    
127     Example: emulate C<new_exec> using C<new>.
128    
129     use AnyEvent::Util;
130     use Proc::FastSpawn;
131    
132     $proc = new AnyEvent::Fork::Remote sub {
133     my $done = shift;
134    
135     my ($a, $b) = AnyEvent::Util::portable_socketpair
136     or die;
137    
138     open my $oldin , "<&0" or die;
139     open my $oldout, ">&1" or die;
140    
141     open STDIN , "<&" . fileno $b or die;
142     open STDOUT, ">&" . fileno $b or die;
143    
144     spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
145    
146     open STDIN , "<&" . fileno $oldin ;
147     open STDOUT, ">&" . fileno $oldout;
148    
149     $done->($a);
150     };
151    
152 root 1.2 =item my $proc = new_from_fh $fh
153    
154     Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
155     handle must be connected to both STDIN and STDOUT of a F<perl> process.
156    
157     This form might be more convenient than C<new> or C<new_exec> when
158     creating an C<AnyEvent::Fork::Remote> object, but the resulting object
159     does not support C<fork>.
160    
161 root 1.1 =cut
162    
163 root 1.2 sub new {
164     my ($class, $create) = @_;
165    
166     bless [
167     $create,
168     "",
169     [],
170     ], $class
171     }
172    
173     sub new_from_fh {
174     my ($class, @fh) = @_;
175    
176     $class->new (sub {
177     shift @fh
178     or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
179     });
180     }
181    
182 root 1.3 sub _new_exec {
183     my $p = pop;
184    
185 root 1.1 my ($class, $program, @argv) = @_;
186    
187     require AnyEvent::Util;
188     require Proc::FastSpawn;
189    
190     $class->new (sub {
191     my $done = shift;
192    
193     my ($a, $b) = AnyEvent::Util::portable_socketpair ()
194     or die;
195    
196     open my $oldin , "<&0" or die;
197     open my $oldout, ">&1" or die;
198    
199     open STDIN , "<&" . fileno $b or die;
200     open STDOUT, ">&" . fileno $b or die;
201    
202 root 1.3 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
203     : Proc::FastSpawn::spawn ($program, \@argv);
204 root 1.1
205     open STDIN , "<&" . fileno $oldin ;
206     open STDOUT, ">&" . fileno $oldout;
207    
208     $done->($a);
209     })
210     }
211    
212 root 1.3 sub new_exec {
213     push @_, 0;
214     &_new_exec
215     }
216    
217     sub new_execp {
218     push @_, 1;
219     &_new_exec
220     }
221    
222 root 1.1 =item $new_proc = $proc->fork
223    
224     Quite the same as the same method of L<AnyEvent::Fork>, except that it
225     simply clones the object without creating an actual process.
226    
227     =cut
228    
229     sub fork {
230     my $self = shift;
231    
232     bless [
233     $self->[0],
234     $self->[1],
235     [@{ $self->[2] }],
236     ], ref $self
237     }
238    
239     =item undef = $proc->pid
240    
241     The C<pid> method always returns C<undef> and only exists for
242     compatibility with L<AnyEvent::Fork>.
243    
244     =cut
245    
246     sub pid {
247     undef
248     }
249    
250     =item $proc = $proc->send_fh (...)
251    
252     Not supported and always croaks.
253    
254     =cut
255    
256     sub send_fh {
257 root 1.2 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
258 root 1.1 }
259    
260     =item $proc = $proc->eval ($perlcode, @args)
261    
262     Quite the same as the same method of L<AnyEvent::Fork>.
263    
264     =cut
265    
266     # quote a binary string as a perl scalar
267     sub sq($) {
268     my $s = shift;
269    
270     $s =~ /'/
271     or return "'$s'";
272    
273     $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
274     "q\x10$s\x10"
275     }
276    
277     # quote a list of strings
278     sub aq(@) {
279     "(" . (join ",", map sq $_, @_) . ")"
280     }
281    
282     sub eval {
283     my ($self, $perlcode, @args) = @_;
284    
285 root 1.2 my $linecode = $perlcode;
286     $linecode =~ s/\s+/ /g; # takes care of \n
287     $linecode =~ s/"/''/g;
288     substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
289    
290     $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
291 root 1.1 }
292    
293     =item $proc = $proc->require ($module, ...)
294    
295     Quite the same as the same method of L<AnyEvent::Fork>.
296    
297     =cut
298    
299     sub require {
300     my ($self, @modules) = @_;
301    
302 root 1.2 $self->eval ("require $_")
303     for @modules;
304 root 1.1
305     $self
306     }
307    
308     =item $proc = $proc->send_arg ($string, ...)
309    
310     Quite the same as the same method of L<AnyEvent::Fork>.
311    
312     =cut
313    
314     sub send_arg {
315     my ($self, @arg) = @_;
316    
317     push @{ $self->[2] }, @arg;
318    
319     $self
320     }
321    
322     =item $proc->run ($func, $cb->($fh))
323    
324     Very similar to the run method of L<AnyEvent::Fork>.
325    
326 root 1.2 On the parent side, the API is identical, except that a C<$cb> argument of
327     C<undef> instad of a valid file handle signals an error.
328    
329     On the child side, the "communications socket" is in fact just C<*STDIN>,
330     and typically can only be read from (this highly depends on how the
331     program is created - if you just run F<perl> locally, it will work for
332     both reading and writing, but commands such as F<rsh> or F<ssh> typically
333     only provide read-only handles for STDIN).
334    
335     To be portable, if the run function wants to read data that is written to
336     C<$fh> in the parent, then it should read from STDIN. If the run function
337     wants to provide data that can later be read from C<$fh>, then it should
338     write them to STDOUT.
339    
340     You can write a run function that works with both L<AnyEvent::Fork>
341     and this module by checking C<fileno $fh>. If it is C<0> (meaning
342     it is STDIN), then you should use it for reading, and STDOUT for
343     writing. Otherwise, you should use the file handle for both:
344 root 1.1
345     sub run {
346     my ($rfh, ...) = @_;
347     my $wfh = fileno $rfh ? $rfh : *STDOUT;
348    
349     # now use $rfh for reading and $wfh for writing
350     }
351    
352     =cut
353    
354     sub run {
355     my ($self, $func, $cb) = @_;
356    
357     $self->[0](sub {
358     my $fh = shift
359     or die "AnyEvent::Fork::Remote: create callback failed";
360    
361 root 1.4 my $code = 'BEGIN { $0 = "AnyEvent::Fork::Remote of another process"; ' . $self->[1] . "}\n"
362 root 1.2 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
363     . '{ sysread STDIN, my $dummy, 1 }'
364     . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
365     . "\n__END__\n";
366    
367     warn $code;#d#
368    
369     AnyEvent::Util::fh_nonblocking $fh, 1;
370    
371     my ($rw, $ww);
372    
373     my $ofs;
374    
375     $ww = AE::io $fh, 1, sub {
376     my $len = syswrite $fh, $code, 1<<20, $ofs;
377    
378     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
379     $ofs += $len;
380     undef $ww if $ofs >= length $code;
381     } else {
382     # error
383     ($ww, $rw) = (); $cb->(undef);
384     }
385     };
386    
387     my $rbuf;
388    
389     $rw = AE::io $fh, 0, sub {
390     my $len = sysread $fh, $rbuf, 1<<10;
391    
392     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
393     $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
394    
395     if ($rbuf eq ($magic0 ^ $magic1)) {
396     # all data was sent, magic was received - both
397     # directions should be "empty", and therefore
398     # the socket must accept at least a single octet,
399     # to signal the "child" to go on.
400     undef $rw;
401     die if $ww; # uh-oh
402    
403     syswrite $fh, "\n";
404     $cb->($fh);
405     }
406     } else {
407     # error
408     ($ww, $rw) = (); $cb->(undef);
409     }
410     };
411 root 1.1 });
412     }
413    
414     =back
415    
416     =head1 SEE ALSO
417    
418     L<AnyEvent::Fork>, the same as this module, for local processes.
419    
420     L<AnyEvent::Fork::RPC>, to talk to the created processes.
421    
422     L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
423    
424     =head1 AUTHOR AND CONTACT INFORMATION
425    
426     Marc Lehmann <schmorp@schmorp.de>
427     http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
428    
429     =cut
430    
431     1
432