ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-Fork-Remote/Remote.pm
Revision: 1.2
Committed: Sat Apr 27 23:59:04 2013 UTC (11 years, 5 months ago) by root
Branch: MAIN
Changes since 1.1: +106 -53 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4
5 THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6
7 =head1 SYNOPSIS
8
9 use AnyEvent;
10 use AnyEvent::Fork::Remote;
11
12 my $rpc = AnyEvent::Fork::Remote
13 ->new
14 ->require ("MyModule")
15 ->run ("MyModule::run", my $cv = AE::cv);
16
17 my $fh = $cv->recv;
18
19 =head1 DESCRIPTION
20
21 Despite what the name of this module might suggest, it doesn't actualyl
22 create remote processes for you. But it does make it easy to use them,
23 once you have started them.
24
25 This module implements a very similar API as L<AnyEvent::Fork>. In fact,
26 similar enough to require at most minor modifications to support both
27 at the same time. For example, it works with L<AnyEvent::Fork::RPC> and
28 L<AnyEvent::Fork::Pool>.
29
30 The documentation for this module will therefore only document the parts
31 of the API that differ between the two modules.
32
33 =head2 SUMMARY OF DIFFERENCES
34
35 Here is a short summary of the main differences between L<AnyEvent::Fork>
36 and this module:
37
38 =over 4
39
40 =item * C<send_fh> is not implemented and will fail
41
42 =item * the child-side C<run> function must read from STDIN and write to STDOUT
43
44 =item * C<fork> does not actually fork, but will create a new process
45
46 =back
47
48 =head1 EXAMPLES
49
50 =head1 PARENT PROCESS USAGE
51
52 =over 4
53
54 =cut
55
56 package AnyEvent::Fork::Remote;
57
58 use common::sense;
59
60 use Carp ();
61 use Errno ();
62
63 use AnyEvent ();
64 use AnyEvent::Util ();
65
66 our $VERSION = 0.1;
67
68 # xored together must start and and with \n
69 my $magic0 = "Pdk{6y[_zZ";
70 my $magic1 = "Z^yZ7~i=oP";
71
72 =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
73
74 Creates a new C<AnyEvent::Fork::Remote> object. Unlike L<AnyEvent::Fork>,
75 processes are only created when C<run> is called, every other method call
76 is is simply recorded until then.
77
78 Each time a new process is needed, it executes C<$path> with the given
79 arguments (the first array member must be the program name, as with
80 the C<exec> function with explicit PROGRAM argument) and both C<STDIN>
81 and C<STDOUT> connected to a communications socket. No input must be
82 consumed by the comamnd before F<perl> is started, and no output should be
83 generated.
84
85 The program I<must> invoke F<perl> somehow, with STDIN and STDOUT intact,
86 without specifying anything to execute (no script file name, no C<-e>
87 switch etc.).
88
89 Here are some examples to give you an idea:
90
91 # just "perl"
92 $proc = new_exec AnyEvent::Fork::Remote
93 "/usr/bin/perl", "perl";
94
95 # rsh othernode exec perl
96 $proc = new_exec AnyEvent::Fork::Remote
97 "/usr/bin/rsh", "rsh", "othernode", "exec perl";
98
99 # a complicated ssh command
100 $proc = new_exec AnyEvent::Fork::Remote
101 "/usr/bin/ssh",
102 qw(ssh -q
103 -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
104 -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
105 otherhost
106 exec perl);
107
108 =item my $proc = new AnyEvent::Fork::Remote $create_callback
109
110 Basically the same as C<new_exec>, but instead of a hardcoded command
111 path, it expects a callback which is invoked each time a process needs to
112 be created.
113
114 The C<$create_callback> is called with another callback as argument,
115 and should call this callback with the file handle that is connected
116 to a F<perl> process. This callback can be invoked even after the
117 C<$create_callback> returns.
118
119 Example: emulate C<new_exec> using C<new>.
120
121 use AnyEvent::Util;
122 use Proc::FastSpawn;
123
124 $proc = new AnyEvent::Fork::Remote sub {
125 my $done = shift;
126
127 my ($a, $b) = AnyEvent::Util::portable_socketpair
128 or die;
129
130 open my $oldin , "<&0" or die;
131 open my $oldout, ">&1" or die;
132
133 open STDIN , "<&" . fileno $b or die;
134 open STDOUT, ">&" . fileno $b or die;
135
136 spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
137
138 open STDIN , "<&" . fileno $oldin ;
139 open STDOUT, ">&" . fileno $oldout;
140
141 $done->($a);
142 };
143
144 =item my $proc = new_from_fh $fh
145
146 Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file
147 handle must be connected to both STDIN and STDOUT of a F<perl> process.
148
149 This form might be more convenient than C<new> or C<new_exec> when
150 creating an C<AnyEvent::Fork::Remote> object, but the resulting object
151 does not support C<fork>.
152
153 =cut
154
155 sub new {
156 my ($class, $create) = @_;
157
158 bless [
159 $create,
160 "",
161 [],
162 ], $class
163 }
164
165 sub new_from_fh {
166 my ($class, @fh) = @_;
167
168 $class->new (sub {
169 shift @fh
170 or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
171 });
172 }
173
174 sub new_exec {
175 my ($class, $program, @argv) = @_;
176
177 require AnyEvent::Util;
178 require Proc::FastSpawn;
179
180 $class->new (sub {
181 my $done = shift;
182
183 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
184 or die;
185
186 open my $oldin , "<&0" or die;
187 open my $oldout, ">&1" or die;
188
189 open STDIN , "<&" . fileno $b or die;
190 open STDOUT, ">&" . fileno $b or die;
191
192 Proc::FastSpawn::spawn ($program, \@argv);
193
194 open STDIN , "<&" . fileno $oldin ;
195 open STDOUT, ">&" . fileno $oldout;
196
197 $done->($a);
198 })
199 }
200
201 =item $new_proc = $proc->fork
202
203 Quite the same as the same method of L<AnyEvent::Fork>, except that it
204 simply clones the object without creating an actual process.
205
206 =cut
207
208 sub fork {
209 my $self = shift;
210
211 bless [
212 $self->[0],
213 $self->[1],
214 [@{ $self->[2] }],
215 ], ref $self
216 }
217
218 =item undef = $proc->pid
219
220 The C<pid> method always returns C<undef> and only exists for
221 compatibility with L<AnyEvent::Fork>.
222
223 =cut
224
225 sub pid {
226 undef
227 }
228
229 =item $proc = $proc->send_fh (...)
230
231 Not supported and always croaks.
232
233 =cut
234
235 sub send_fh {
236 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
237 }
238
239 =item $proc = $proc->eval ($perlcode, @args)
240
241 Quite the same as the same method of L<AnyEvent::Fork>.
242
243 =cut
244
245 # quote a binary string as a perl scalar
246 sub sq($) {
247 my $s = shift;
248
249 $s =~ /'/
250 or return "'$s'";
251
252 $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
253 "q\x10$s\x10"
254 }
255
256 # quote a list of strings
257 sub aq(@) {
258 "(" . (join ",", map sq $_, @_) . ")"
259 }
260
261 sub eval {
262 my ($self, $perlcode, @args) = @_;
263
264 my $linecode = $perlcode;
265 $linecode =~ s/\s+/ /g; # takes care of \n
266 $linecode =~ s/"/''/g;
267 substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
268
269 $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
270 }
271
272 =item $proc = $proc->require ($module, ...)
273
274 Quite the same as the same method of L<AnyEvent::Fork>.
275
276 =cut
277
278 sub require {
279 my ($self, @modules) = @_;
280
281 $self->eval ("require $_")
282 for @modules;
283
284 $self
285 }
286
287 =item $proc = $proc->send_arg ($string, ...)
288
289 Quite the same as the same method of L<AnyEvent::Fork>.
290
291 =cut
292
293 sub send_arg {
294 my ($self, @arg) = @_;
295
296 push @{ $self->[2] }, @arg;
297
298 $self
299 }
300
301 =item $proc->run ($func, $cb->($fh))
302
303 Very similar to the run method of L<AnyEvent::Fork>.
304
305 On the parent side, the API is identical, except that a C<$cb> argument of
306 C<undef> instad of a valid file handle signals an error.
307
308 On the child side, the "communications socket" is in fact just C<*STDIN>,
309 and typically can only be read from (this highly depends on how the
310 program is created - if you just run F<perl> locally, it will work for
311 both reading and writing, but commands such as F<rsh> or F<ssh> typically
312 only provide read-only handles for STDIN).
313
314 To be portable, if the run function wants to read data that is written to
315 C<$fh> in the parent, then it should read from STDIN. If the run function
316 wants to provide data that can later be read from C<$fh>, then it should
317 write them to STDOUT.
318
319 You can write a run function that works with both L<AnyEvent::Fork>
320 and this module by checking C<fileno $fh>. If it is C<0> (meaning
321 it is STDIN), then you should use it for reading, and STDOUT for
322 writing. Otherwise, you should use the file handle for both:
323
324 sub run {
325 my ($rfh, ...) = @_;
326 my $wfh = fileno $rfh ? $rfh : *STDOUT;
327
328 # now use $rfh for reading and $wfh for writing
329 }
330
331 =cut
332
333 sub run {
334 my ($self, $func, $cb) = @_;
335
336 $self->[0](sub {
337 my $fh = shift
338 or die "AnyEvent::Fork::Remote: create callback failed";
339
340 my $code = 'BEGIN {' . $self->[1] . "}\n"
341 . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
342 . '{ sysread STDIN, my $dummy, 1 }'
343 . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
344 . "\n__END__\n";
345
346 warn $code;#d#
347
348 AnyEvent::Util::fh_nonblocking $fh, 1;
349
350 my ($rw, $ww);
351
352 my $ofs;
353
354 $ww = AE::io $fh, 1, sub {
355 my $len = syswrite $fh, $code, 1<<20, $ofs;
356
357 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
358 $ofs += $len;
359 undef $ww if $ofs >= length $code;
360 } else {
361 # error
362 ($ww, $rw) = (); $cb->(undef);
363 }
364 };
365
366 my $rbuf;
367
368 $rw = AE::io $fh, 0, sub {
369 my $len = sysread $fh, $rbuf, 1<<10;
370
371 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
372 $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
373
374 if ($rbuf eq ($magic0 ^ $magic1)) {
375 # all data was sent, magic was received - both
376 # directions should be "empty", and therefore
377 # the socket must accept at least a single octet,
378 # to signal the "child" to go on.
379 undef $rw;
380 die if $ww; # uh-oh
381
382 syswrite $fh, "\n";
383 $cb->($fh);
384 }
385 } else {
386 # error
387 ($ww, $rw) = (); $cb->(undef);
388 }
389 };
390 });
391 }
392
393 =back
394
395 =head1 SEE ALSO
396
397 L<AnyEvent::Fork>, the same as this module, for local processes.
398
399 L<AnyEvent::Fork::RPC>, to talk to the created processes.
400
401 L<AnyEvent::Fork::Pool>, to manage whole pools of processes.
402
403 =head1 AUTHOR AND CONTACT INFORMATION
404
405 Marc Lehmann <schmorp@schmorp.de>
406 http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
407
408 =cut
409
410 1
411