ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.5
Committed: Sun Apr 28 15:36:30 2013 UTC (11 years, 5 months ago) by root
Branch: MAIN
CVS Tags: rel-0_2
Changes since 1.4: +78 -12 lines
Log Message:
0.2

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4    
5     THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6    
7     =head1 SYNOPSIS
8    
9     use AnyEvent;
10     use AnyEvent::Fork::Remote;
11    
12     my $rpc = AnyEvent::Fork::Remote
13 root 1.5 ->new_execp ("ssh", "ssh", "othermachine", "perl")
14 root 1.1 ->require ("MyModule")
15     ->run ("MyModule::run", my $cv = AE::cv);
16    
17     my $fh = $cv->recv;
18    
19     =head1 DESCRIPTION
20    
21 root 1.3 Despite what the name of this module might suggest, it doesn't actually
22 root 1.1 create remote processes for you. But it does make it easy to use them,
23     once you have started them.
24    
25     This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26     similar enough to require at most minor modifications to support both
27     at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28     L<AnyEvent::Fork::Pool>.
29    
30     The documentation for this module will therefore only document the parts
31     of the API that differ between the two modules.
32    
33     =head2 SUMMARY OF DIFFERENCES
34    
35     Here is a short summary of the main differences between L<AnyEvent::Fork>
36     and this module:
37    
38     =over 4
39    
40     =item * C<send_fh> is not implemented and will fail
41    
42     =item * the child-side C<run> function must read from STDIN and write to STDOUT
43    
44     =item * C<fork> does not actually fork, but will create a new process
45    
46     =back
47    
48 root 1.5 =head1 EXAMPLE
49    
50     This example uses a local perl (because that is likely going to work
51     without further setup) and the L<AnyEvent::Fork::RPC> to create simple
52     worker process.
53    
54     First load the modules we are going to use:
55    
56     use AnyEvent;
57     use AnyEvent::Fork::Remote;
58     use AnyEvent::Fork::RPC;
59    
60     Then create, configure and run the process:
61    
62     my $rpc = AnyEvent::Fork::Remote
63     ->new_execp ("perl", "perl")
64     ->eval ('
65     sub myrun {
66     "this is process $$, and you passed <@_>"
67     }
68     ')
69     ->AnyEvent::Fork::RPC::run ("myrun");
70    
71     We use C<new_execp> to execute the first F<perl> found in the PATH. You'll
72     have to make sure there is one for this to work. The perl does not
73     actually have to be the same perl as the one running the example, and it
74     doesn't need to have any modules installed.
75    
76     The reason we have to specif< C<perl> twice is that the first argument to
77     C<new_execp> (and also C<new_exec>) is the program name or path, while
78     the remaining ones are the arguments, and the first argument passed to a
79     program is the program name, so it has to be specified twice.
80    
81     Finally, the standard example, send some numbers to the remote function,
82     and print whatever it returns:
83    
84     my $cv = AE::cv;
85    
86     for (1..10) {
87     $cv->begin;
88     $rpc->($_, sub {
89     print "remote function returned: $_[0]\n";
90     $cv->end;
91     });
92     }
93    
94     $cv->recv;
95    
96     Now, executing F<perl> in the PATH isn't very interesting - you could have
97     done the same with L<AnyEvent::Fork>, and it might even be more efficient.
98    
99     The power of this module is that the F<perl> doesn't need to run on the
100     local box, you could simply substitute another command, such as F<ssh
101     remotebox perl>:
102    
103     my $rpc = AnyEvent::Fork::Remote
104     ->new_execp ("ssh", "ssh", "remotebox", "perl")
105    
106     And if you want to use a specific path for ssh, use C<new_exec>:
107    
108     my $rpc = AnyEvent::Fork::Remote
109     ->new_exec ("/usr/bin/ssh", "ssh", "remotebox", "perl")
110    
111     Of course, it doesn't really matter to this module how you construct your
112     perl processes, what matters is that somehow, you give it a file handle
113     connected to the new perls STDIN and STDOUT.
114 root 1.1
115     =head1 PARENT PROCESS USAGE
116    
117     =over 4
118    
119     =cut
120    
121     package AnyEvent::Fork::Remote;
122    
123     use common::sense;
124    
125 root 1.2 use Carp ();
126 root 1.1 use Errno ();
127    
128     use AnyEvent ();
129    
130 root 1.5 our $VERSION = 0.2;
131 root 1.1
132     # xored together must start and and with \n
133     my $magic0 = "Pdk{6y[_zZ";
134     my $magic1 = "Z^yZ7~i=oP";
135    
136     =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
137    
138     Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
139     processes are only created when C<run> is called, every other method call
140     is is simply recorded until then.
141    
142     Each time a new process is needed, it executes C<$path> with the given
143     arguments (the first array member must be the program name, as with
144     the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
145     and C<STDOUT> connected to a communications socket. No input must be
146 root 1.5 consumed by the command before F<perl> is started, and no output should be
147 root 1.1 generated.
148    
149     The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
150     without specifying anything to execute (no script file name, no C<-e>
151     switch etc.).
152    
153     Here are some examples to give you an idea:
154    
155     # just "perl"
156     $proc = new_exec AnyEvent::Fork::Remote
157     "/usr/bin/perl", "perl";
158    
159     # rsh othernode exec perl
160     $proc = new_exec AnyEvent::Fork::Remote
161     "/usr/bin/rsh", "rsh", "othernode", "exec perl";
162    
163     # a complicated ssh command
164     $proc = new_exec AnyEvent::Fork::Remote
165     "/usr/bin/ssh",
166     qw(ssh -q
167     -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
168     -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
169     otherhost
170     exec perl);
171    
172 root 1.3 =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
173    
174     Just like C<new_exec>, except that the program is searched in the
175     C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
176     to find e.g. C<ssh>:
177    
178     $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
179    
180 root 1.1 =item my $proc = new AnyEvent::Fork::Remote $create_callback
181    
182 root 1.5 Basically the same as C<new_exec>, but instead of a command to execute,
183     it expects a callback which is invoked each time a process needs to be
184     created.
185 root 1.1
186     The C<$create_callback> is called with another callback as argument,
187     and should call this callback with the file handle that is connected
188     to a F<perl> process. This callback can be invoked even after the
189     C<$create_callback> returns.
190    
191     Example: emulate C<new_exec> using C<new>.
192    
193     use AnyEvent::Util;
194     use Proc::FastSpawn;
195    
196     $proc = new AnyEvent::Fork::Remote sub {
197     my $done = shift;
198    
199     my ($a, $b) = AnyEvent::Util::portable_socketpair
200     or die;
201    
202     open my $oldin , "<&0" or die;
203     open my $oldout, ">&1" or die;
204    
205     open STDIN , "<&" . fileno $b or die;
206     open STDOUT, ">&" . fileno $b or die;
207    
208     spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
209    
210     open STDIN , "<&" . fileno $oldin ;
211     open STDOUT, ">&" . fileno $oldout;
212    
213     $done->($a);
214     };
215    
216 root 1.2 =item my $proc = new_from_fh $fh
217    
218     Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
219     handle must be connected to both STDIN and STDOUT of a F<perl> process.
220    
221     This form might be more convenient than C<new> or C<new_exec> when
222     creating an C<AnyEvent::Fork::Remote> object, but the resulting object
223     does not support C<fork>.
224    
225 root 1.1 =cut
226    
227 root 1.2 sub new {
228     my ($class, $create) = @_;
229    
230     bless [
231     $create,
232     "",
233     [],
234     ], $class
235     }
236    
237     sub new_from_fh {
238     my ($class, @fh) = @_;
239    
240     $class->new (sub {
241     shift @fh
242     or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
243     });
244     }
245    
246 root 1.3 sub _new_exec {
247     my $p = pop;
248    
249 root 1.1 my ($class, $program, @argv) = @_;
250    
251     require AnyEvent::Util;
252     require Proc::FastSpawn;
253    
254     $class->new (sub {
255     my $done = shift;
256    
257     my ($a, $b) = AnyEvent::Util::portable_socketpair ()
258     or die;
259    
260     open my $oldin , "<&0" or die;
261     open my $oldout, ">&1" or die;
262    
263     open STDIN , "<&" . fileno $b or die;
264     open STDOUT, ">&" . fileno $b or die;
265    
266 root 1.3 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
267     : Proc::FastSpawn::spawn ($program, \@argv);
268 root 1.1
269     open STDIN , "<&" . fileno $oldin ;
270     open STDOUT, ">&" . fileno $oldout;
271    
272     $done->($a);
273     })
274     }
275    
276 root 1.3 sub new_exec {
277     push @_, 0;
278     &_new_exec
279     }
280    
281     sub new_execp {
282     push @_, 1;
283     &_new_exec
284     }
285    
286 root 1.1 =item $new_proc = $proc->fork
287    
288     Quite the same as the same method of L<AnyEvent::Fork>, except that it
289     simply clones the object without creating an actual process.
290    
291     =cut
292    
293     sub fork {
294     my $self = shift;
295    
296     bless [
297     $self->[0],
298     $self->[1],
299     [@{ $self->[2] }],
300     ], ref $self
301     }
302    
303     =item undef = $proc->pid
304    
305     The C<pid> method always returns C<undef> and only exists for
306     compatibility with L<AnyEvent::Fork>.
307    
308     =cut
309    
310     sub pid {
311     undef
312     }
313    
314     =item $proc = $proc->send_fh (...)
315    
316     Not supported and always croaks.
317    
318     =cut
319    
320     sub send_fh {
321 root 1.2 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
322 root 1.1 }
323    
324     =item $proc = $proc->eval ($perlcode, @args)
325    
326     Quite the same as the same method of L<AnyEvent::Fork>.
327    
328     =cut
329    
330     # quote a binary string as a perl scalar
331     sub sq($) {
332     my $s = shift;
333    
334     $s =~ /'/
335     or return "'$s'";
336    
337     $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
338     "q\x10$s\x10"
339     }
340    
341     # quote a list of strings
342     sub aq(@) {
343     "(" . (join ",", map sq $_, @_) . ")"
344     }
345    
346     sub eval {
347     my ($self, $perlcode, @args) = @_;
348    
349 root 1.2 my $linecode = $perlcode;
350     $linecode =~ s/\s+/ /g; # takes care of \n
351     $linecode =~ s/"/''/g;
352     substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
353    
354     $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
355 root 1.5
356     $self
357 root 1.1 }
358    
359     =item $proc = $proc->require ($module, ...)
360    
361     Quite the same as the same method of L<AnyEvent::Fork>.
362    
363     =cut
364    
365     sub require {
366     my ($self, @modules) = @_;
367    
368 root 1.2 $self->eval ("require $_")
369     for @modules;
370 root 1.1
371     $self
372     }
373    
374     =item $proc = $proc->send_arg ($string, ...)
375    
376     Quite the same as the same method of L<AnyEvent::Fork>.
377    
378     =cut
379    
380     sub send_arg {
381     my ($self, @arg) = @_;
382    
383     push @{ $self->[2] }, @arg;
384    
385     $self
386     }
387    
388     =item $proc->run ($func, $cb->($fh))
389    
390     Very similar to the run method of L<AnyEvent::Fork>.
391    
392 root 1.2 On the parent side, the API is identical, except that a C<$cb> argument of
393 root 1.5 C<undef> instead of a valid file handle signals an error.
394 root 1.2
395     On the child side, the "communications socket" is in fact just C<*STDIN>,
396     and typically can only be read from (this highly depends on how the
397     program is created - if you just run F<perl> locally, it will work for
398     both reading and writing, but commands such as F<rsh> or F<ssh> typically
399     only provide read-only handles for STDIN).
400    
401     To be portable, if the run function wants to read data that is written to
402     C<$fh> in the parent, then it should read from STDIN. If the run function
403     wants to provide data that can later be read from C<$fh>, then it should
404     write them to STDOUT.
405    
406     You can write a run function that works with both L<AnyEvent::Fork>
407     and this module by checking C<fileno $fh>. If it is C<0> (meaning
408     it is STDIN), then you should use it for reading, and STDOUT for
409     writing. Otherwise, you should use the file handle for both:
410 root 1.1
411     sub run {
412     my ($rfh, ...) = @_;
413     my $wfh = fileno $rfh ? $rfh : *STDOUT;
414    
415     # now use $rfh for reading and $wfh for writing
416     }
417    
418     =cut
419    
420     sub run {
421     my ($self, $func, $cb) = @_;
422    
423     $self->[0](sub {
424     my $fh = shift
425     or die "AnyEvent::Fork::Remote: create callback failed";
426    
427 root 1.5 my $owner = length $ENV{HOSTNAME} ? "$ENV{HOSTNAME}:$$" : "*:$$";
428    
429     my $code = 'BEGIN { $0 = ' . (sq "$func of $owner") . '; ' . $self->[1] . "}\n"
430 root 1.2 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
431     . '{ sysread STDIN, my $dummy, 1 }'
432     . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
433     . "\n__END__\n";
434    
435     AnyEvent::Util::fh_nonblocking $fh, 1;
436    
437     my ($rw, $ww);
438    
439     my $ofs;
440    
441     $ww = AE::io $fh, 1, sub {
442     my $len = syswrite $fh, $code, 1<<20, $ofs;
443    
444     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
445     $ofs += $len;
446     undef $ww if $ofs >= length $code;
447     } else {
448     # error
449     ($ww, $rw) = (); $cb->(undef);
450     }
451     };
452    
453     my $rbuf;
454    
455     $rw = AE::io $fh, 0, sub {
456     my $len = sysread $fh, $rbuf, 1<<10;
457    
458     if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
459     $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
460    
461     if ($rbuf eq ($magic0 ^ $magic1)) {
462     # all data was sent, magic was received - both
463     # directions should be "empty", and therefore
464     # the socket must accept at least a single octet,
465     # to signal the "child" to go on.
466     undef $rw;
467     die if $ww; # uh-oh
468    
469     syswrite $fh, "\n";
470     $cb->($fh);
471     }
472     } else {
473     # error
474     ($ww, $rw) = (); $cb->(undef);
475     }
476     };
477 root 1.1 });
478     }
479    
480     =back
481    
482     =head1 SEE ALSO
483    
484     L<AnyEvent::Fork>, the same as this module, for local processes.
485    
486     L<AnyEvent::Fork::RPC>, to talk to the created processes.
487    
488     L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
489    
490     =head1 AUTHOR AND CONTACT INFORMATION
491    
492     Marc Lehmann <schmorp@schmorp.de>
493     http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
494    
495     =cut
496    
497     1
498