ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.7
Committed: Sat May 21 07:37:04 2016 UTC (7 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-1_0, HEAD
Changes since 1.6: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4
5 THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6
7 =head1 SYNOPSIS
8
9 use AnyEvent;
10 use AnyEvent::Fork::Remote;
11
12 my $rpc = AnyEvent::Fork::Remote
13 ->new_execp ("ssh", "ssh", "othermachine", "perl")
14 ->require ("MyModule")
15 ->run ("MyModule::run", my $cv = AE::cv);
16
17 my $fh = $cv->recv;
18
19 =head1 DESCRIPTION
20
21 Despite what the name of this module might suggest, it doesn't actually
22 create remote processes for you. But it does make it easy to use them,
23 once you have started them.
24
25 This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26 similar enough to require at most minor modifications to support both
27 at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28 L<AnyEvent::Fork::Pool>.
29
30 The documentation for this module will therefore only document the parts
31 of the API that differ between the two modules.
32
33 =head2 SUMMARY OF DIFFERENCES
34
35 Here is a short summary of the main differences between L<AnyEvent::Fork>
36 and this module:
37
38 =over 4
39
40 =item * C<send_fh> is not implemented and will fail
41
42 =item * the child-side C<run> function must read from STDIN and write to STDOUT
43
44 =item * C<fork> does not actually fork, but will create a new process
45
46 =back
47
48 =head1 EXAMPLE
49
50 This example uses a local perl (because that is likely going to work
51 without further setup) and the L<AnyEvent::Fork::RPC> to create simple
52 worker process.
53
54 First load the modules we are going to use:
55
56 use AnyEvent;
57 use AnyEvent::Fork::Remote;
58 use AnyEvent::Fork::RPC;
59
60 Then create, configure and run the process:
61
62 my $rpc = AnyEvent::Fork::Remote
63 ->new_execp ("perl", "perl")
64 ->eval ('
65 sub myrun {
66 "this is process $$, and you passed <@_>"
67 }
68 ')
69 ->AnyEvent::Fork::RPC::run ("myrun");
70
71 We use C<new_execp> to execute the first F<perl> found in the PATH. You'll
72 have to make sure there is one for this to work. The perl does not
73 actually have to be the same perl as the one running the example, and it
74 doesn't need to have any modules installed.
75
76 The reason we have to specify C<perl> twice is that the first argument to
77 C<new_execp> (and also C<new_exec>) is the program name or path, while
78 the remaining ones are the arguments, and the first argument passed to a
79 program is the program name, so it has to be specified twice.
80
81 Finally, the standard example, send some numbers to the remote function,
82 and print whatever it returns:
83
84 my $cv = AE::cv;
85
86 for (1..10) {
87 $cv->begin;
88 $rpc->($_, sub {
89 print "remote function returned: $_[0]\n";
90 $cv->end;
91 });
92 }
93
94 $cv->recv;
95
96 Now, executing F<perl> in the PATH isn't very interesting - you could have
97 done the same with L<AnyEvent::Fork>, and it might even be more efficient.
98
99 The power of this module is that the F<perl> doesn't need to run on the
100 local box, you could simply substitute another command, such as F<ssh
101 remotebox perl>:
102
103 my $rpc = AnyEvent::Fork::Remote
104 ->new_execp ("ssh", "ssh", "remotebox", "perl")
105
106 And if you want to use a specific path for ssh, use C<new_exec>:
107
108 my $rpc = AnyEvent::Fork::Remote
109 ->new_exec ("/usr/bin/ssh", "ssh", "remotebox", "perl")
110
111 Of course, it doesn't really matter to this module how you construct your
112 perl processes, what matters is that somehow, you give it a file handle
113 connected to the new perls STDIN and STDOUT.
114
115 =head1 PARENT PROCESS USAGE
116
117 =over 4
118
119 =cut
120
121 package AnyEvent::Fork::Remote;
122
123 use common::sense;
124
125 use Carp ();
126 use Errno ();
127
128 use AnyEvent ();
129
130 our $VERSION = '1.0';
131
132 # xored together must start and and with \n
133 my $magic0 = "Pdk{6y[_zZ";
134 my $magic1 = "Z^yZ7~i=oP";
135
136 =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
137
138 Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
139 processes are only created when C<run> is called, every other method call
140 is is simply recorded until then.
141
142 Each time a new process is needed, it executes C<$path> with the given
143 arguments (the first array member must be the program name, as with
144 the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
145 and C<STDOUT> connected to a communications socket. No input must be
146 consumed by the command before F<perl> is started, and no output should be
147 generated.
148
149 The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
150 without specifying anything to execute (no script file name, no C<-e>
151 switch etc.).
152
153 Here are some examples to give you an idea:
154
155 # just "perl"
156 $proc = new_exec AnyEvent::Fork::Remote
157 "/usr/bin/perl", "perl";
158
159 # rsh othernode exec perl
160 $proc = new_exec AnyEvent::Fork::Remote
161 "/usr/bin/rsh", "rsh", "othernode", "exec perl";
162
163 # a complicated ssh command
164 $proc = new_exec AnyEvent::Fork::Remote
165 "/usr/bin/ssh",
166 qw(ssh -q
167 -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
168 -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
169 otherhost
170 exec perl);
171
172 =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
173
174 Just like C<new_exec>, except that the program is searched in the
175 C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
176 to find e.g. C<ssh>:
177
178 $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
179
180 =item my $proc = new AnyEvent::Fork::Remote $create_callback
181
182 Basically the same as C<new_exec>, but instead of a command to execute,
183 it expects a callback which is invoked each time a process needs to be
184 created.
185
186 The C<$create_callback> is called with another callback as argument,
187 and should call this callback with the file handle that is connected
188 to a F<perl> process. This callback can be invoked even after the
189 C<$create_callback> returns.
190
191 Example: emulate C<new_exec> using C<new>.
192
193 use AnyEvent::Util;
194 use Proc::FastSpawn;
195
196 $proc = new AnyEvent::Fork::Remote sub {
197 my $done = shift;
198
199 my ($a, $b) = AnyEvent::Util::portable_socketpair
200 or die;
201
202 open my $oldin , "<&0" or die;
203 open my $oldout, ">&1" or die;
204
205 open STDIN , "<&" . fileno $b or die;
206 open STDOUT, ">&" . fileno $b or die;
207
208 spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
209
210 open STDIN , "<&" . fileno $oldin ;
211 open STDOUT, ">&" . fileno $oldout;
212
213 $done->($a);
214 };
215
216 =item my $proc = new_from_fh $fh
217
218 Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
219 handle must be connected to both STDIN and STDOUT of a F<perl> process.
220
221 This form might be more convenient than C<new> or C<new_exec> when
222 creating an C<AnyEvent::Fork::Remote> object, but the resulting object
223 does not support C<fork>.
224
225 =cut
226
227 sub new {
228 my ($class, $create) = @_;
229
230 bless [
231 $create,
232 "",
233 [],
234 ], $class
235 }
236
237 sub new_from_fh {
238 my ($class, @fh) = @_;
239
240 $class->new (sub {
241 my $fh = shift @fh
242 or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
243
244 $_[0]($fh);
245 });
246 }
247
248 sub _new_exec {
249 my $p = pop;
250
251 my ($class, $program, @argv) = @_;
252
253 require AnyEvent::Util;
254 require Proc::FastSpawn;
255
256 $class->new (sub {
257 my $done = shift;
258
259 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
260 or die;
261
262 open my $oldin , "<&0" or die;
263 open my $oldout, ">&1" or die;
264
265 open STDIN , "<&" . fileno $b or die;
266 open STDOUT, ">&" . fileno $b or die;
267
268 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
269 : Proc::FastSpawn::spawn ($program, \@argv);
270
271 open STDIN , "<&" . fileno $oldin ;
272 open STDOUT, ">&" . fileno $oldout;
273
274 $done->($a);
275 })
276 }
277
278 sub new_exec {
279 push @_, 0;
280 &_new_exec
281 }
282
283 sub new_execp {
284 push @_, 1;
285 &_new_exec
286 }
287
288 =item $new_proc = $proc->fork
289
290 Quite the same as the same method of L<AnyEvent::Fork>, except that it
291 simply clones the object without creating an actual process.
292
293 =cut
294
295 sub fork {
296 my $self = shift;
297
298 bless [
299 $self->[0],
300 $self->[1],
301 [@{ $self->[2] }],
302 ], ref $self
303 }
304
305 =item undef = $proc->pid
306
307 The C<pid> method always returns C<undef> and only exists for
308 compatibility with L<AnyEvent::Fork>.
309
310 =cut
311
312 sub pid {
313 undef
314 }
315
316 =item $proc = $proc->send_fh (...)
317
318 Not supported and always croaks.
319
320 =cut
321
322 sub send_fh {
323 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
324 }
325
326 =item $proc = $proc->eval ($perlcode, @args)
327
328 Quite the same as the same method of L<AnyEvent::Fork>.
329
330 =cut
331
332 # quote a binary string as a perl scalar
333 sub sq($) {
334 my $s = shift;
335
336 $s =~ /'/
337 or return "'$s'";
338
339 $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
340 "q\x10$s\x10"
341 }
342
343 # quote a list of strings
344 sub aq(@) {
345 "(" . (join ",", map sq $_, @_) . ")"
346 }
347
348 sub eval {
349 my ($self, $perlcode, @args) = @_;
350
351 my $linecode = $perlcode;
352 $linecode =~ s/\s+/ /g; # takes care of \n
353 $linecode =~ s/"/''/g;
354 substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
355
356 $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
357
358 $self
359 }
360
361 =item $proc = $proc->require ($module, ...)
362
363 Quite the same as the same method of L<AnyEvent::Fork>.
364
365 =cut
366
367 sub require {
368 my ($self, @modules) = @_;
369
370 $self->eval ("require $_")
371 for @modules;
372
373 $self
374 }
375
376 =item $proc = $proc->send_arg ($string, ...)
377
378 Quite the same as the same method of L<AnyEvent::Fork>.
379
380 =cut
381
382 sub send_arg {
383 my ($self, @arg) = @_;
384
385 push @{ $self->[2] }, @arg;
386
387 $self
388 }
389
390 =item $proc->run ($func, $cb->($fh))
391
392 Very similar to the run method of L<AnyEvent::Fork>.
393
394 On the parent side, the API is identical, except that a C<$cb> argument of
395 C<undef> instead of a valid file handle signals an error.
396
397 On the child side, the "communications socket" is in fact just C<*STDIN>,
398 and typically can only be read from (this highly depends on how the
399 program is created - if you just run F<perl> locally, it will work for
400 both reading and writing, but commands such as F<rsh> or F<ssh> typically
401 only provide read-only handles for STDIN).
402
403 To be portable, if the run function wants to read data that is written to
404 C<$fh> in the parent, then it should read from STDIN. If the run function
405 wants to provide data that can later be read from C<$fh>, then it should
406 write them to STDOUT.
407
408 You can write a run function that works with both L<AnyEvent::Fork>
409 and this module by checking C<fileno $fh>. If it is C<0> (meaning
410 it is STDIN), then you should use it for reading, and STDOUT for
411 writing. Otherwise, you should use the file handle for both:
412
413 sub run {
414 my ($rfh, ...) = @_;
415 my $wfh = fileno $rfh ? $rfh : *STDOUT;
416
417 # now use $rfh for reading and $wfh for writing
418 }
419
420 =cut
421
422 sub run {
423 my ($self, $func, $cb) = @_;
424
425 $self->[0](sub {
426 my $fh = shift
427 or die "AnyEvent::Fork::Remote: create callback failed";
428
429 my $owner = length $ENV{HOSTNAME} ? "$ENV{HOSTNAME}:$$" : "*:$$";
430
431 my $code = 'BEGIN { $0 = ' . (sq "$owner $func") . '; ' . $self->[1] . "}\n"
432 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
433 . '{ sysread STDIN, my $dummy, 1 }'
434 . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
435 . "\n__END__\n";
436
437 AnyEvent::Util::fh_nonblocking $fh, 1;
438
439 my ($rw, $ww);
440
441 my $ofs;
442
443 $ww = AE::io $fh, 1, sub {
444 my $len = syswrite $fh, $code, 1<<20, $ofs;
445
446 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
447 $ofs += $len;
448 undef $ww if $ofs >= length $code;
449 } else {
450 # error
451 ($ww, $rw) = (); $cb->(undef);
452 }
453 };
454
455 my $rbuf;
456
457 $rw = AE::io $fh, 0, sub {
458 my $len = sysread $fh, $rbuf, 1<<10;
459
460 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
461 $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
462
463 if ($rbuf eq ($magic0 ^ $magic1)) {
464 # all data was sent, magic was received - both
465 # directions should be "empty", and therefore
466 # the socket must accept at least a single octet,
467 # to signal the "child" to go on.
468 undef $rw;
469 die if $ww; # uh-oh
470
471 syswrite $fh, "\n";
472 $cb->($fh);
473 }
474 } else {
475 # error
476 ($ww, $rw) = (); $cb->(undef);
477 }
478 };
479 });
480 }
481
482 =back
483
484 =head1 SEE ALSO
485
486 L<AnyEvent::Fork>, the same as this module, for local processes.
487
488 L<AnyEvent::Fork::RPC>, to talk to the created processes.
489
490 L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
491
492 =head1 AUTHOR AND CONTACT INFORMATION
493
494 Marc Lehmann <schmorp@schmorp.de>
495 http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
496
497 =cut
498
499 1
500