ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.6
Committed: Sun Jul 13 00:14:58 2014 UTC (10 years, 2 months ago) by root
Branch: MAIN
Changes since 1.5: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4    
5     THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6    
7     =head1 SYNOPSIS
8    
9     use AnyEvent;
10     use AnyEvent::Fork::Remote;
11    
12     my $rpc = AnyEvent::Fork::Remote
13 root 1.5 ->new_execp ("ssh", "ssh", "othermachine", "perl")
14 root 1.1 ->require ("MyModule")
15     ->run ("MyModule::run", my $cv = AE::cv);
16    
17     my $fh = $cv->recv;
18    
19     =head1 DESCRIPTION
20    
21 root 1.3 Despite what the name of this module might suggest, it doesn't actually
22 root 1.1 create remote processes for you. But it does make it easy to use them,
23     once you have started them.
24    
25     This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26     similar enough to require at most minor modifications to support both
27     at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28     L<AnyEvent::Fork::Pool>.
29    
30     The documentation for this module will therefore only document the parts
31     of the API that differ between the two modules.
32    
33     =head2 SUMMARY OF DIFFERENCES
34    
35     Here is a short summary of the main differences between L<AnyEvent::Fork>
36     and this module:
37    
38     =over 4
39    
40     =item * C<send_fh> is not implemented and will fail
41    
42     =item * the child-side C<run> function must read from STDIN and write to STDOUT
43    
44     =item * C<fork> does not actually fork, but will create a new process
45    
46     =back
47    
48 root 1.5 =head1 EXAMPLE
49    
50     This example uses a local perl (because that is likely going to work
51     without further setup) and the L<AnyEvent::Fork::RPC> to create simple
52     worker process.
53    
54     First load the modules we are going to use:
55    
56     use AnyEvent;
57     use AnyEvent::Fork::Remote;
58     use AnyEvent::Fork::RPC;
59    
60     Then create, configure and run the process:
61    
62     my $rpc = AnyEvent::Fork::Remote
63     ->new_execp ("perl", "perl")
64     ->eval ('
65     sub myrun {
66     "this is process $$, and you passed <@_>"
67     }
68     ')
69     ->AnyEvent::Fork::RPC::run ("myrun");
70    
71     We use C<new_execp> to execute the first F<perl> found in the PATH. You'll
72     have to make sure there is one for this to work. The perl does not
73     actually have to be the same perl as the one running the example, and it
74     doesn't need to have any modules installed.
75    
76 root 1.6 The reason we have to specify C<perl> twice is that the first argument to
77 root 1.5 C<new_execp> (and also C<new_exec>) is the program name or path, while
78     the remaining ones are the arguments, and the first argument passed to a
79     program is the program name, so it has to be specified twice.
80    
81     Finally, the standard example, send some numbers to the remote function,
82     and print whatever it returns:
83    
84     my $cv = AE::cv;
85    
86     for (1..10) {
87     $cv->begin;
88     $rpc->($_, sub {
89     print "remote function returned: $_[0]\n";
90     $cv->end;
91     });
92     }
93    
94     $cv->recv;
95    
96     Now, executing F<perl> in the PATH isn't very interesting - you could have
97     done the same with L<AnyEvent::Fork>, and it might even be more efficient.
98    
99     The power of this module is that the F<perl> doesn't need to run on the
100     local box, you could simply substitute another command, such as F<ssh
101     remotebox perl>:
102    
103     my $rpc = AnyEvent::Fork::Remote
104     ->new_execp ("ssh", "ssh", "remotebox", "perl")
105    
106     And if you want to use a specific path for ssh, use C<new_exec>:
107    
108     my $rpc = AnyEvent::Fork::Remote
109     ->new_exec ("/usr/bin/ssh", "ssh", "remotebox", "perl")
110    
111     Of course, it doesn't really matter to this module how you construct your
112     perl processes, what matters is that somehow, you give it a file handle
113     connected to the new perls STDIN and STDOUT.
114 root 1.1
115     =head1 PARENT PROCESS USAGE
116    
117     =over 4
118    
119     =cut
120    
121     package AnyEvent::Fork::Remote;
122    
123     use common::sense;
124    
125 root 1.2 use Carp ();
126 root 1.1 use Errno ();
127    
128     use AnyEvent ();
129    
130 root 1.5 our $VERSION = 0.2;
131 root 1.1
132     # xored together must start and and with \n
133     my $magic0 = "Pdk{6y[_zZ";
134     my $magic1 = "Z^yZ7~i=oP";
135    
136     =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
137    
138     Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
139     processes are only created when C<run> is called, every other method call
140     is is simply recorded until then.
141    
142     Each time a new process is needed, it executes C<$path> with the given
143     arguments (the first array member must be the program name, as with
144     the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
145     and C<STDOUT> connected to a communications socket. No input must be
146 root 1.5 consumed by the command before F<perl> is started, and no output should be
147 root 1.1 generated.
148    
149     The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
150     without specifying anything to execute (no script file name, no C<-e>
151     switch etc.).
152    
153     Here are some examples to give you an idea:
154    
155     # just "perl"
156     $proc = new_exec AnyEvent::Fork::Remote
157     "/usr/bin/perl", "perl";
158    
159     # rsh othernode exec perl
160     $proc = new_exec AnyEvent::Fork::Remote
161     "/usr/bin/rsh", "rsh", "othernode", "exec perl";
162    
163     # a complicated ssh command
164     $proc = new_exec AnyEvent::Fork::Remote
165     "/usr/bin/ssh",
166     qw(ssh -q
167     -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
168     -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
169     otherhost
170     exec perl);
171    
172 root 1.3 =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
173    
174     Just like C<new_exec>, except that the program is searched in the
175     C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
176     to find e.g. C<ssh>:
177    
178     $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
179    
180 root 1.1 =item my $proc = new AnyEvent::Fork::Remote $create_callback
181    
182 root 1.5 Basically the same as C<new_exec>, but instead of a command to execute,
183     it expects a callback which is invoked each time a process needs to be
184     created.
185 root 1.1
186     The C<$create_callback> is called with another callback as argument,
187     and should call this callback with the file handle that is connected
188     to a F<perl> process. This callback can be invoked even after the
189     C<$create_callback> returns.
190    
191     Example: emulate C<new_exec> using C<new>.
192    
193     use AnyEvent::Util;
194     use Proc::FastSpawn;
195    
196     $proc = new AnyEvent::Fork::Remote sub {
197     my $done = shift;
198    
199     my ($a, $b) = AnyEvent::Util::portable_socketpair
200     or die;
201    
202     open my $oldin , "<&0" or die;
203     open my $oldout, ">&1" or die;
204    
205     open STDIN , "<&" . fileno $b or die;
206     open STDOUT, ">&" . fileno $b or die;
207    
208     spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
209    
210     open STDIN , "<&" . fileno $oldin ;
211     open STDOUT, ">&" . fileno $oldout;
212    
213     $done->($a);
214     };
215    
216 root 1.2 =item my $proc = new_from_fh $fh
217    
218     Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
219     handle must be connected to both STDIN and STDOUT of a F<perl> process.
220    
221     This form might be more convenient than C<new> or C<new_exec> when
222     creating an C<AnyEvent::Fork::Remote> object, but the resulting object
223     does not support C<fork>.
224    
225 root 1.1 =cut
226    
227 root 1.2 sub new {
228     my ($class, $create) = @_;
229    
230     bless [
231     $create,
232     "",
233     [],
234     ], $class
235     }
236    
237     sub new_from_fh {
238     my ($class, @fh) = @_;
239    
240     $class->new (sub {
241 root 1.6 my $fh = shift @fh
242 root 1.2 or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
243 root 1.6
244     $_[0]($fh);
245 root 1.2 });
246     }
247    
248 root 1.3 sub _new_exec {
249     my $p = pop;
250    
251 root 1.1 my ($class, $program, @argv) = @_;
252    
253     require AnyEvent::Util;
254     require Proc::FastSpawn;
255    
256     $class->new (sub {
257     my $done = shift;
258    
259     my ($a, $b) = AnyEvent::Util::portable_socketpair ()
260     or die;
261    
262     open my $oldin , "<&0" or die;
263     open my $oldout, ">&1" or die;
264    
265     open STDIN , "<&" . fileno $b or die;
266     open STDOUT, ">&" . fileno $b or die;
267    
268 root 1.3 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
269     : Proc::FastSpawn::spawn ($program, \@argv);
270 root 1.1
271     open STDIN , "<&" . fileno $oldin ;
272     open STDOUT, ">&" . fileno $oldout;
273    
274     $done->($a);
275     })
276     }
277    
278 root 1.3 sub new_exec {
279     push @_, 0;
280     &_new_exec
281     }
282    
283     sub new_execp {
284     push @_, 1;
285     &_new_exec
286     }
287    
288 root 1.1 =item $new_proc = $proc->fork
289    
290     Quite the same as the same method of L<AnyEvent::Fork>, except that it
291     simply clones the object without creating an actual process.
292    
293     =cut
294    
295     sub fork {
296     my $self = shift;
297    
298     bless [
299     $self->[0],
300     $self->[1],
301     [@{ $self->[2] }],
302     ], ref $self
303     }
304    
305     =item undef = $proc->pid
306    
307     The C<pid> method always returns C<undef> and only exists for
308     compatibility with L<AnyEvent::Fork>.
309    
310     =cut
311    
312     sub pid {
313     undef
314     }
315    
316     =item $proc = $proc->send_fh (...)
317    
318     Not supported and always croaks.
319    
320     =cut
321    
322     sub send_fh {
323 root 1.2 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
324 root 1.1 }
325    
326     =item $proc = $proc->eval ($perlcode, @args)
327    
328     Quite the same as the same method of L<AnyEvent::Fork>.
329    
330     =cut
331    
332     # quote a binary string as a perl scalar
333     sub sq($) {
334     my $s = shift;
335    
336     $s =~ /'/
337     or return "'$s'";
338    
339     $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
340     "q\x10$s\x10"
341     }
342    
343     # quote a list of strings
344     sub aq(@) {
345     "(" . (join ",", map sq $_, @_) . ")"
346     }
347    
348     sub eval {
349     my ($self, $perlcode, @args) = @_;
350    
351 root 1.2 my $linecode = $perlcode;
352     $linecode =~ s/\s+/ /g; # takes care of \n
353     $linecode =~ s/"/''/g;
354     substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
355    
356     $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
357 root 1.5
358     $self
359 root 1.1 }
360    
361     =item $proc = $proc->require ($module, ...)
362    
363     Quite the same as the same method of L<AnyEvent::Fork>.
364    
365     =cut
366    
367     sub require {
368     my ($self, @modules) = @_;
369    
370 root 1.2 $self->eval ("require $_")
371     for @modules;
372 root 1.1
373     $self
374     }
375    
376     =item $proc = $proc->send_arg ($string, ...)
377    
378     Quite the same as the same method of L<AnyEvent::Fork>.
379    
380     =cut
381    
382     sub send_arg {
383     my ($self, @arg) = @_;
384    
385     push @{ $self->[2] }, @arg;
386    
387     $self
388     }
389    
390     =item $proc->run ($func, $cb->($fh))
391    
392     Very similar to the run method of L<AnyEvent::Fork>.
393    
394 root 1.2 On the parent side, the API is identical, except that a C<$cb> argument of
395 root 1.5 C<undef> instead of a valid file handle signals an error.
396 root 1.2
397     On the child side, the "communications socket" is in fact just C<*STDIN>,
398     and typically can only be read from (this highly depends on how the
399     program is created - if you just run F<perl> locally, it will work for
400     both reading and writing, but commands such as F<rsh> or F<ssh> typically
401     only provide read-only handles for STDIN).
402    
403     To be portable, if the run function wants to read data that is written to
404     C<$fh> in the parent, then it should read from STDIN. If the run function
405     wants to provide data that can later be read from C<$fh>, then it should
406     write them to STDOUT.
407    
408     You can write a run function that works with both L<AnyEvent::Fork>
409     and this module by checking C<fileno $fh>. If it is C<0> (meaning
410     it is STDIN), then you should use it for reading, and STDOUT for
411     writing. Otherwise, you should use the file handle for both:
412 root 1.1
413     sub run {
414     my ($rfh, ...) = @_;
415     my $wfh = fileno $rfh ? $rfh : *STDOUT;
416    
417     # now use $rfh for reading and $wfh for writing
418     }
419    
420     =cut
421    
422     sub run {
423     my ($self, $func, $cb) = @_;
424    
425     $self->[0](sub {
426     my $fh = shift
427     or die "AnyEvent::Fork::Remote: create callback failed";
428    
429 root 1.5 my $owner = length $ENV{HOSTNAME} ? "$ENV{HOSTNAME}:$$" : "*:$$";
430    
431     my $code = 'BEGIN { $0 = ' . (sq "$func of $owner") . '; ' . $self->[1] . "}\n"
432 root 1.2 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
433     . '{ sysread STDIN, my $dummy, 1 }'
434     . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
435     . "\n__END__\n";
436    
437     AnyEvent::Util::fh_nonblocking $fh, 1;
438    
439     my ($rw, $ww);
440    
441     my $ofs;
442    
443     $ww = AE::io $fh, 1, sub {
444     my $len = syswrite $fh, $code, 1<<20, $ofs;
445    
446     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
447     $ofs += $len;
448     undef $ww if $ofs >= length $code;
449     } else {
450     # error
451     ($ww, $rw) = (); $cb->(undef);
452     }
453     };
454    
455     my $rbuf;
456    
457     $rw = AE::io $fh, 0, sub {
458     my $len = sysread $fh, $rbuf, 1<<10;
459    
460     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
461     $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
462    
463     if ($rbuf eq ($magic0 ^ $magic1)) {
464     # all data was sent, magic was received - both
465     # directions should be "empty", and therefore
466     # the socket must accept at least a single octet,
467     # to signal the "child" to go on.
468     undef $rw;
469     die if $ww; # uh-oh
470    
471     syswrite $fh, "\n";
472     $cb->($fh);
473     }
474     } else {
475     # error
476     ($ww, $rw) = (); $cb->(undef);
477     }
478     };
479 root 1.1 });
480     }
481    
482     =back
483    
484     =head1 SEE ALSO
485    
486     L<AnyEvent::Fork>, the same as this module, for local processes.
487    
488     L<AnyEvent::Fork::RPC>, to talk to the created processes.
489    
490     L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
491    
492     =head1 AUTHOR AND CONTACT INFORMATION
493    
494     Marc Lehmann <schmorp@schmorp.de>
495     http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
496    
497     =cut
498    
499     1
500