ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
(Generate patch)

Comparing AnyEvent-Fork-Remote/Remote.pm (file contents):
Revision 1.1 by root, Sat Apr 27 01:44:55 2013 UTC vs.
Revision 1.7 by root, Sat May 21 07:37:04 2016 UTC

8 8
9 use AnyEvent; 9 use AnyEvent;
10 use AnyEvent::Fork::Remote; 10 use AnyEvent::Fork::Remote;
11 11
12 my $rpc = AnyEvent::Fork::Remote 12 my $rpc = AnyEvent::Fork::Remote
13 ->new 13 ->new_execp ("ssh", "ssh", "othermachine", "perl")
14 ->require ("MyModule") 14 ->require ("MyModule")
15 ->run ("MyModule::run", my $cv = AE::cv); 15 ->run ("MyModule::run", my $cv = AE::cv);
16 16
17 my $fh = $cv->recv; 17 my $fh = $cv->recv;
18 18
19=head1 DESCRIPTION 19=head1 DESCRIPTION
20 20
21Despite what the name of this module might suggest, it doesn't actualyl 21Despite what the name of this module might suggest, it doesn't actually
22create remote processes for you. But it does make it easy to use them, 22create remote processes for you. But it does make it easy to use them,
23once you have started them. 23once you have started them.
24 24
25This module implements a very similar API as L<AnyEvent::Fork>. In fact, 25This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26similar enough to require at most minor modifications to support both 26similar enough to require at most minor modifications to support both
43 43
44=item * C<fork> does not actually fork, but will create a new process 44=item * C<fork> does not actually fork, but will create a new process
45 45
46=back 46=back
47 47
48=head1 EXAMPLES 48=head1 EXAMPLE
49
50This example uses a local perl (because that is likely going to work
51without further setup) and the L<AnyEvent::Fork::RPC> to create simple
52worker process.
53
54First load the modules we are going to use:
55
56 use AnyEvent;
57 use AnyEvent::Fork::Remote;
58 use AnyEvent::Fork::RPC;
59
60Then create, configure and run the process:
61
62 my $rpc = AnyEvent::Fork::Remote
63 ->new_execp ("perl", "perl")
64 ->eval ('
65 sub myrun {
66 "this is process $$, and you passed <@_>"
67 }
68 ')
69 ->AnyEvent::Fork::RPC::run ("myrun");
70
71We use C<new_execp> to execute the first F<perl> found in the PATH. You'll
72have to make sure there is one for this to work. The perl does not
73actually have to be the same perl as the one running the example, and it
74doesn't need to have any modules installed.
75
76The reason we have to specify C<perl> twice is that the first argument to
77C<new_execp> (and also C<new_exec>) is the program name or path, while
78the remaining ones are the arguments, and the first argument passed to a
79program is the program name, so it has to be specified twice.
80
81Finally, the standard example, send some numbers to the remote function,
82and print whatever it returns:
83
84 my $cv = AE::cv;
85
86 for (1..10) {
87 $cv->begin;
88 $rpc->($_, sub {
89 print "remote function returned: $_[0]\n";
90 $cv->end;
91 });
92 }
93
94 $cv->recv;
95
96Now, executing F<perl> in the PATH isn't very interesting - you could have
97done the same with L<AnyEvent::Fork>, and it might even be more efficient.
98
99The power of this module is that the F<perl> doesn't need to run on the
100local box, you could simply substitute another command, such as F<ssh
101remotebox perl>:
102
103 my $rpc = AnyEvent::Fork::Remote
104 ->new_execp ("ssh", "ssh", "remotebox", "perl")
105
106And if you want to use a specific path for ssh, use C<new_exec>:
107
108 my $rpc = AnyEvent::Fork::Remote
109 ->new_exec ("/usr/bin/ssh", "ssh", "remotebox", "perl")
110
111Of course, it doesn't really matter to this module how you construct your
112perl processes, what matters is that somehow, you give it a file handle
113connected to the new perls STDIN and STDOUT.
49 114
50=head1 PARENT PROCESS USAGE 115=head1 PARENT PROCESS USAGE
51 116
52=over 4 117=over 4
53 118
55 120
56package AnyEvent::Fork::Remote; 121package AnyEvent::Fork::Remote;
57 122
58use common::sense; 123use common::sense;
59 124
125use Carp ();
60use Errno (); 126use Errno ();
61 127
62use AnyEvent (); 128use AnyEvent ();
63use AnyEvent::Util ();
64 129
65our $VERSION = 0.1; 130our $VERSION = '1.0';
66 131
67# xored together must start and and with \n 132# xored together must start and and with \n
68my $magic0 = "Pdk{6y[_zZ"; 133my $magic0 = "Pdk{6y[_zZ";
69my $magic1 = "Z^yZ7~i=oP"; 134my $magic1 = "Z^yZ7~i=oP";
70 135
76 141
77Each time a new process is needed, it executes C<$path> with the given 142Each time a new process is needed, it executes C<$path> with the given
78arguments (the first array member must be the program name, as with 143arguments (the first array member must be the program name, as with
79the C<exec> function with explicit PROGRAM argument) and both C<STDIN> 144the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
80and C<STDOUT> connected to a communications socket. No input must be 145and C<STDOUT> connected to a communications socket. No input must be
81consumed by the comamnd before F<perl> is started, and no output should be 146consumed by the command before F<perl> is started, and no output should be
82generated. 147generated.
83 148
84The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact, 149The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
85without specifying anything to execute (no script file name, no C<-e> 150without specifying anything to execute (no script file name, no C<-e>
86switch etc.). 151switch etc.).
102 -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no 167 -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
103 -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null 168 -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
104 otherhost 169 otherhost
105 exec perl); 170 exec perl);
106 171
172=item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
173
174Just like C<new_exec>, except that the program is searched in the
175C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
176to find e.g. C<ssh>:
177
178 $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
179
107=item my $proc = new AnyEvent::Fork::Remote $create_callback 180=item my $proc = new AnyEvent::Fork::Remote $create_callback
108 181
109Basically the same as C<new_exec>, but instead of a hardcoded command 182Basically the same as C<new_exec>, but instead of a command to execute,
110path, it expects a callback which is invoked each time a process needs to 183it expects a callback which is invoked each time a process needs to be
111be created. 184created.
112 185
113The C<$create_callback> is called with another callback as argument, 186The C<$create_callback> is called with another callback as argument,
114and should call this callback with the file handle that is connected 187and should call this callback with the file handle that is connected
115to a F<perl> process. This callback can be invoked even after the 188to a F<perl> process. This callback can be invoked even after the
116C<$create_callback> returns. 189C<$create_callback> returns.
138 open STDOUT, ">&" . fileno $oldout; 211 open STDOUT, ">&" . fileno $oldout;
139 212
140 $done->($a); 213 $done->($a);
141 }; 214 };
142 215
143=cut 216=item my $proc = new_from_fh $fh
144 217
145sub new_exec { 218Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
146 my ($class, $program, @argv) = @_; 219handle must be connected to both STDIN and STDOUT of a F<perl> process.
147 220
148 require AnyEvent::Util; 221This form might be more convenient than C<new> or C<new_exec> when
149 require Proc::FastSpawn; 222creating an C<AnyEvent::Fork::Remote> object, but the resulting object
223does not support C<fork>.
150 224
151 $class->new (sub { 225=cut
152 my $done = shift;
153
154 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
155 or die;
156
157 open my $oldin , "<&0" or die;
158 open my $oldout, ">&1" or die;
159
160 open STDIN , "<&" . fileno $b or die;
161 open STDOUT, ">&" . fileno $b or die;
162
163 Proc::FastSpawn::spawn ($program, \@argv);
164
165 open STDIN , "<&" . fileno $oldin ;
166 open STDOUT, ">&" . fileno $oldout;
167
168 $done->($a);
169 })
170}
171 226
172sub new { 227sub new {
173 my ($class, $create) = @_; 228 my ($class, $create) = @_;
174 229
175 bless [ 230 bless [
177 "", 232 "",
178 [], 233 [],
179 ], $class 234 ], $class
180} 235}
181 236
237sub new_from_fh {
238 my ($class, @fh) = @_;
239
240 $class->new (sub {
241 my $fh = shift @fh
242 or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
243
244 $_[0]($fh);
245 });
246}
247
248sub _new_exec {
249 my $p = pop;
250
251 my ($class, $program, @argv) = @_;
252
253 require AnyEvent::Util;
254 require Proc::FastSpawn;
255
256 $class->new (sub {
257 my $done = shift;
258
259 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
260 or die;
261
262 open my $oldin , "<&0" or die;
263 open my $oldout, ">&1" or die;
264
265 open STDIN , "<&" . fileno $b or die;
266 open STDOUT, ">&" . fileno $b or die;
267
268 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
269 : Proc::FastSpawn::spawn ($program, \@argv);
270
271 open STDIN , "<&" . fileno $oldin ;
272 open STDOUT, ">&" . fileno $oldout;
273
274 $done->($a);
275 })
276}
277
278sub new_exec {
279 push @_, 0;
280 &_new_exec
281}
282
283sub new_execp {
284 push @_, 1;
285 &_new_exec
286}
287
182=item $new_proc = $proc->fork 288=item $new_proc = $proc->fork
183 289
184Quite the same as the same method of L<AnyEvent::Fork>, except that it 290Quite the same as the same method of L<AnyEvent::Fork>, except that it
185simply clones the object without creating an actual process. 291simply clones the object without creating an actual process.
186 292
212Not supported and always croaks. 318Not supported and always croaks.
213 319
214=cut 320=cut
215 321
216sub send_fh { 322sub send_fh {
217 require Carp;
218 Carp::croak ("send_fh is not supported on AnyEvent::Fork::Remote objects"); 323 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
219} 324}
220 325
221=item $proc = $proc->eval ($perlcode, @args) 326=item $proc = $proc->eval ($perlcode, @args)
222 327
223Quite the same as the same method of L<AnyEvent::Fork>. 328Quite the same as the same method of L<AnyEvent::Fork>.
241} 346}
242 347
243sub eval { 348sub eval {
244 my ($self, $perlcode, @args) = @_; 349 my ($self, $perlcode, @args) = @_;
245 350
351 my $linecode = $perlcode;
352 $linecode =~ s/\s+/ /g; # takes care of \n
353 $linecode =~ s/"/''/g;
354 substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
355
246 $self->[1] .= '{ local @_ = ' . (aq @args) . "; $perlcode }\n"; 356 $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
357
358 $self
247} 359}
248 360
249=item $proc = $proc->require ($module, ...) 361=item $proc = $proc->require ($module, ...)
250 362
251Quite the same as the same method of L<AnyEvent::Fork>. 363Quite the same as the same method of L<AnyEvent::Fork>.
253=cut 365=cut
254 366
255sub require { 367sub require {
256 my ($self, @modules) = @_; 368 my ($self, @modules) = @_;
257 369
370 $self->eval ("require $_")
258 s%::%/%g for @modules; 371 for @modules;
259 $self->eval ('require "$_.pm" for @_', @modules);
260 372
261 $self 373 $self
262} 374}
263 375
264=item $proc = $proc->send_arg ($string, ...) 376=item $proc = $proc->send_arg ($string, ...)
277 389
278=item $proc->run ($func, $cb->($fh)) 390=item $proc->run ($func, $cb->($fh))
279 391
280Very similar to the run method of L<AnyEvent::Fork>. 392Very similar to the run method of L<AnyEvent::Fork>.
281 393
282On the parent side, the API is identical. On the child side, the 394On the parent side, the API is identical, except that a C<$cb> argument of
283"communications socket" is in fact just C<*STDIN>, and typically can only 395C<undef> instead of a valid file handle signals an error.
284be read from.
285 396
397On the child side, the "communications socket" is in fact just C<*STDIN>,
398and typically can only be read from (this highly depends on how the
399program is created - if you just run F<perl> locally, it will work for
400both reading and writing, but commands such as F<rsh> or F<ssh> typically
401only provide read-only handles for STDIN).
402
286If the run function wants to read data that is written to C<$fh> in the 403To be portable, if the run function wants to read data that is written to
287parent, then it should read from STDIN. If the run function wants to 404C<$fh> in the parent, then it should read from STDIN. If the run function
288provide data that can later be read from C<$fh>, then it should write them 405wants to provide data that can later be read from C<$fh>, then it should
289to STDOUT. 406write them to STDOUT.
290 407
291You can write a run function that works with both L<AnyEvent::Fork> and 408You can write a run function that works with both L<AnyEvent::Fork>
292this module by checking C<fileno $fh> in on the passed callback in the run 409and this module by checking C<fileno $fh>. If it is C<0> (meaning
293function: 410it is STDIN), then you should use it for reading, and STDOUT for
411writing. Otherwise, you should use the file handle for both:
294 412
295 sub run { 413 sub run {
296 my ($rfh, ...) = @_; 414 my ($rfh, ...) = @_;
297 my $wfh = fileno $rfh ? $rfh : *STDOUT; 415 my $wfh = fileno $rfh ? $rfh : *STDOUT;
298 416
302=cut 420=cut
303 421
304sub run { 422sub run {
305 my ($self, $func, $cb) = @_; 423 my ($self, $func, $cb) = @_;
306 424
307 my $code = 'BEGIN {' . $self->[1] . '}'
308 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
309 . $func . (aq @{ $self->[2] }) . ';'
310 . "\n__END__\n";
311
312 $self->[0](sub { 425 $self->[0](sub {
313 my $fh = shift 426 my $fh = shift
314 or die "AnyEvent::Fork::Remote: create callback failed"; 427 or die "AnyEvent::Fork::Remote: create callback failed";
315 428
429 my $owner = length $ENV{HOSTNAME} ? "$ENV{HOSTNAME}:$$" : "*:$$";
316 430
431 my $code = 'BEGIN { $0 = ' . (sq "$owner $func") . '; ' . $self->[1] . "}\n"
432 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
433 . '{ sysread STDIN, my $dummy, 1 }'
434 . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
435 . "\n__END__\n";
317 436
437 AnyEvent::Util::fh_nonblocking $fh, 1;
438
439 my ($rw, $ww);
440
441 my $ofs;
442
443 $ww = AE::io $fh, 1, sub {
444 my $len = syswrite $fh, $code, 1<<20, $ofs;
445
446 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
447 $ofs += $len;
448 undef $ww if $ofs >= length $code;
449 } else {
450 # error
451 ($ww, $rw) = (); $cb->(undef);
452 }
453 };
454
455 my $rbuf;
456
457 $rw = AE::io $fh, 0, sub {
458 my $len = sysread $fh, $rbuf, 1<<10;
459
460 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
461 $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
462
463 if ($rbuf eq ($magic0 ^ $magic1)) {
464 # all data was sent, magic was received - both
465 # directions should be "empty", and therefore
466 # the socket must accept at least a single octet,
467 # to signal the "child" to go on.
468 undef $rw;
469 die if $ww; # uh-oh
470
471 syswrite $fh, "\n";
472 $cb->($fh);
473 }
474 } else {
475 # error
476 ($ww, $rw) = (); $cb->(undef);
477 }
478 };
318 }); 479 });
319} 480}
320
321my $x = new_exec AnyEvent::Fork::Remote "/usr/bin/rsh", "rsh", "rain", "exec perl";#d#
322$x->require ("Carp", "Storable");#d#
323$x->send_arg (1, 2, 3);#d#
324$x->eval ('sub run { die }');#d#
325$x->run ("run", sub {
326 });
327
328
329=item my $proc = new_from_stdio $fh
330
331Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
332handle must be connected to both STDIN and STDOUT of a F<perl> process.
333
334This form might be more convenient than C<new> or C<new_exec> when
335creating an C<AnyEvent::Fork::Remote> object, but the resulting object
336does not support C<fork>.
337
338#TODO: really implement?
339 481
340=back 482=back
341 483
342=head1 SEE ALSO 484=head1 SEE ALSO
343 485

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines