ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.3
Committed: Sun Apr 28 01:04:00 2013 UTC (11 years, 5 months ago) by root
Branch: MAIN
Changes since 1.2: +24 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4
5 THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6
7 =head1 SYNOPSIS
8
9 use AnyEvent;
10 use AnyEvent::Fork::Remote;
11
12 my $rpc = AnyEvent::Fork::Remote
13 ->new
14 ->require ("MyModule")
15 ->run ("MyModule::run", my $cv = AE::cv);
16
17 my $fh = $cv->recv;
18
19 =head1 DESCRIPTION
20
21 Despite what the name of this module might suggest, it doesn't actually
22 create remote processes for you. But it does make it easy to use them,
23 once you have started them.
24
25 This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26 similar enough to require at most minor modifications to support both
27 at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28 L<AnyEvent::Fork::Pool>.
29
30 The documentation for this module will therefore only document the parts
31 of the API that differ between the two modules.
32
33 =head2 SUMMARY OF DIFFERENCES
34
35 Here is a short summary of the main differences between L<AnyEvent::Fork>
36 and this module:
37
38 =over 4
39
40 =item * C<send_fh> is not implemented and will fail
41
42 =item * the child-side C<run> function must read from STDIN and write to STDOUT
43
44 =item * C<fork> does not actually fork, but will create a new process
45
46 =back
47
48 =head1 EXAMPLES
49
50 =head1 PARENT PROCESS USAGE
51
52 =over 4
53
54 =cut
55
56 package AnyEvent::Fork::Remote;
57
58 use common::sense;
59
60 use Carp ();
61 use Errno ();
62
63 use AnyEvent ();
64 use AnyEvent::Util ();
65
66 our $VERSION = 0.1;
67
68 # xored together must start and and with \n
69 my $magic0 = "Pdk{6y[_zZ";
70 my $magic1 = "Z^yZ7~i=oP";
71
72 =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
73
74 Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
75 processes are only created when C<run> is called, every other method call
76 is is simply recorded until then.
77
78 Each time a new process is needed, it executes C<$path> with the given
79 arguments (the first array member must be the program name, as with
80 the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
81 and C<STDOUT> connected to a communications socket. No input must be
82 consumed by the comamnd before F<perl> is started, and no output should be
83 generated.
84
85 The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
86 without specifying anything to execute (no script file name, no C<-e>
87 switch etc.).
88
89 Here are some examples to give you an idea:
90
91 # just "perl"
92 $proc = new_exec AnyEvent::Fork::Remote
93 "/usr/bin/perl", "perl";
94
95 # rsh othernode exec perl
96 $proc = new_exec AnyEvent::Fork::Remote
97 "/usr/bin/rsh", "rsh", "othernode", "exec perl";
98
99 # a complicated ssh command
100 $proc = new_exec AnyEvent::Fork::Remote
101 "/usr/bin/ssh",
102 qw(ssh -q
103 -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
104 -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
105 otherhost
106 exec perl);
107
108 =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
109
110 Just like C<new_exec>, except that the program is searched in the
111 C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
112 to find e.g. C<ssh>:
113
114 $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
115
116 =item my $proc = new AnyEvent::Fork::Remote $create_callback
117
118 Basically the same as C<new_exec>, but instead of a hardcoded command
119 path, it expects a callback which is invoked each time a process needs to
120 be created.
121
122 The C<$create_callback> is called with another callback as argument,
123 and should call this callback with the file handle that is connected
124 to a F<perl> process. This callback can be invoked even after the
125 C<$create_callback> returns.
126
127 Example: emulate C<new_exec> using C<new>.
128
129 use AnyEvent::Util;
130 use Proc::FastSpawn;
131
132 $proc = new AnyEvent::Fork::Remote sub {
133 my $done = shift;
134
135 my ($a, $b) = AnyEvent::Util::portable_socketpair
136 or die;
137
138 open my $oldin , "<&0" or die;
139 open my $oldout, ">&1" or die;
140
141 open STDIN , "<&" . fileno $b or die;
142 open STDOUT, ">&" . fileno $b or die;
143
144 spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
145
146 open STDIN , "<&" . fileno $oldin ;
147 open STDOUT, ">&" . fileno $oldout;
148
149 $done->($a);
150 };
151
152 =item my $proc = new_from_fh $fh
153
154 Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
155 handle must be connected to both STDIN and STDOUT of a F<perl> process.
156
157 This form might be more convenient than C<new> or C<new_exec> when
158 creating an C<AnyEvent::Fork::Remote> object, but the resulting object
159 does not support C<fork>.
160
161 =cut
162
163 sub new {
164 my ($class, $create) = @_;
165
166 bless [
167 $create,
168 "",
169 [],
170 ], $class
171 }
172
173 sub new_from_fh {
174 my ($class, @fh) = @_;
175
176 $class->new (sub {
177 shift @fh
178 or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
179 });
180 }
181
182 sub _new_exec {
183 my $p = pop;
184
185 my ($class, $program, @argv) = @_;
186
187 require AnyEvent::Util;
188 require Proc::FastSpawn;
189
190 $class->new (sub {
191 my $done = shift;
192
193 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
194 or die;
195
196 open my $oldin , "<&0" or die;
197 open my $oldout, ">&1" or die;
198
199 open STDIN , "<&" . fileno $b or die;
200 open STDOUT, ">&" . fileno $b or die;
201
202 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
203 : Proc::FastSpawn::spawn ($program, \@argv);
204
205 open STDIN , "<&" . fileno $oldin ;
206 open STDOUT, ">&" . fileno $oldout;
207
208 $done->($a);
209 })
210 }
211
212 sub new_exec {
213 push @_, 0;
214 &_new_exec
215 }
216
217 sub new_execp {
218 push @_, 1;
219 &_new_exec
220 }
221
222 =item $new_proc = $proc->fork
223
224 Quite the same as the same method of L<AnyEvent::Fork>, except that it
225 simply clones the object without creating an actual process.
226
227 =cut
228
229 sub fork {
230 my $self = shift;
231
232 bless [
233 $self->[0],
234 $self->[1],
235 [@{ $self->[2] }],
236 ], ref $self
237 }
238
239 =item undef = $proc->pid
240
241 The C<pid> method always returns C<undef> and only exists for
242 compatibility with L<AnyEvent::Fork>.
243
244 =cut
245
246 sub pid {
247 undef
248 }
249
250 =item $proc = $proc->send_fh (...)
251
252 Not supported and always croaks.
253
254 =cut
255
256 sub send_fh {
257 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
258 }
259
260 =item $proc = $proc->eval ($perlcode, @args)
261
262 Quite the same as the same method of L<AnyEvent::Fork>.
263
264 =cut
265
266 # quote a binary string as a perl scalar
267 sub sq($) {
268 my $s = shift;
269
270 $s =~ /'/
271 or return "'$s'";
272
273 $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
274 "q\x10$s\x10"
275 }
276
277 # quote a list of strings
278 sub aq(@) {
279 "(" . (join ",", map sq $_, @_) . ")"
280 }
281
282 sub eval {
283 my ($self, $perlcode, @args) = @_;
284
285 my $linecode = $perlcode;
286 $linecode =~ s/\s+/ /g; # takes care of \n
287 $linecode =~ s/"/''/g;
288 substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
289
290 $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
291 }
292
293 =item $proc = $proc->require ($module, ...)
294
295 Quite the same as the same method of L<AnyEvent::Fork>.
296
297 =cut
298
299 sub require {
300 my ($self, @modules) = @_;
301
302 $self->eval ("require $_")
303 for @modules;
304
305 $self
306 }
307
308 =item $proc = $proc->send_arg ($string, ...)
309
310 Quite the same as the same method of L<AnyEvent::Fork>.
311
312 =cut
313
314 sub send_arg {
315 my ($self, @arg) = @_;
316
317 push @{ $self->[2] }, @arg;
318
319 $self
320 }
321
322 =item $proc->run ($func, $cb->($fh))
323
324 Very similar to the run method of L<AnyEvent::Fork>.
325
326 On the parent side, the API is identical, except that a C<$cb> argument of
327 C<undef> instad of a valid file handle signals an error.
328
329 On the child side, the "communications socket" is in fact just C<*STDIN>,
330 and typically can only be read from (this highly depends on how the
331 program is created - if you just run F<perl> locally, it will work for
332 both reading and writing, but commands such as F<rsh> or F<ssh> typically
333 only provide read-only handles for STDIN).
334
335 To be portable, if the run function wants to read data that is written to
336 C<$fh> in the parent, then it should read from STDIN. If the run function
337 wants to provide data that can later be read from C<$fh>, then it should
338 write them to STDOUT.
339
340 You can write a run function that works with both L<AnyEvent::Fork>
341 and this module by checking C<fileno $fh>. If it is C<0> (meaning
342 it is STDIN), then you should use it for reading, and STDOUT for
343 writing. Otherwise, you should use the file handle for both:
344
345 sub run {
346 my ($rfh, ...) = @_;
347 my $wfh = fileno $rfh ? $rfh : *STDOUT;
348
349 # now use $rfh for reading and $wfh for writing
350 }
351
352 =cut
353
354 sub run {
355 my ($self, $func, $cb) = @_;
356
357 $self->[0](sub {
358 my $fh = shift
359 or die "AnyEvent::Fork::Remote: create callback failed";
360
361 my $code = 'BEGIN {' . $self->[1] . "}\n"
362 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
363 . '{ sysread STDIN, my $dummy, 1 }'
364 . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
365 . "\n__END__\n";
366
367 warn $code;#d#
368
369 AnyEvent::Util::fh_nonblocking $fh, 1;
370
371 my ($rw, $ww);
372
373 my $ofs;
374
375 $ww = AE::io $fh, 1, sub {
376 my $len = syswrite $fh, $code, 1<<20, $ofs;
377
378 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
379 $ofs += $len;
380 undef $ww if $ofs >= length $code;
381 } else {
382 # error
383 ($ww, $rw) = (); $cb->(undef);
384 }
385 };
386
387 my $rbuf;
388
389 $rw = AE::io $fh, 0, sub {
390 my $len = sysread $fh, $rbuf, 1<<10;
391
392 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
393 $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
394
395 if ($rbuf eq ($magic0 ^ $magic1)) {
396 # all data was sent, magic was received - both
397 # directions should be "empty", and therefore
398 # the socket must accept at least a single octet,
399 # to signal the "child" to go on.
400 undef $rw;
401 die if $ww; # uh-oh
402
403 syswrite $fh, "\n";
404 $cb->($fh);
405 }
406 } else {
407 # error
408 ($ww, $rw) = (); $cb->(undef);
409 }
410 };
411 });
412 }
413
414 =back
415
416 =head1 SEE ALSO
417
418 L<AnyEvent::Fork>, the same as this module, for local processes.
419
420 L<AnyEvent::Fork::RPC>, to talk to the created processes.
421
422 L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
423
424 =head1 AUTHOR AND CONTACT INFORMATION
425
426 Marc Lehmann <schmorp@schmorp.de>
427 http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
428
429 =cut
430
431 1
432