… | |
… | |
55 | |
55 | |
56 | package AnyEvent::Fork::Remote; |
56 | package AnyEvent::Fork::Remote; |
57 | |
57 | |
58 | use common::sense; |
58 | use common::sense; |
59 | |
59 | |
|
|
60 | use Carp (); |
60 | use Errno (); |
61 | use Errno (); |
61 | |
62 | |
62 | use AnyEvent (); |
63 | use AnyEvent (); |
63 | use AnyEvent::Util (); |
64 | use AnyEvent::Util (); |
64 | |
65 | |
… | |
… | |
138 | open STDOUT, ">&" . fileno $oldout; |
139 | open STDOUT, ">&" . fileno $oldout; |
139 | |
140 | |
140 | $done->($a); |
141 | $done->($a); |
141 | }; |
142 | }; |
142 | |
143 | |
143 | =cut |
144 | =item my $proc = new_from_fh $fh |
144 | |
145 | |
145 | sub new_exec { |
146 | Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file |
146 | my ($class, $program, @argv) = @_; |
147 | handle must be connected to both STDIN and STDOUT of a F<perl> process. |
147 | |
148 | |
148 | require AnyEvent::Util; |
149 | This form might be more convenient than C<new> or C<new_exec> when |
149 | require Proc::FastSpawn; |
150 | creating an C<AnyEvent::Fork::Remote> object, but the resulting object |
|
|
151 | does not support C<fork>. |
150 | |
152 | |
151 | $class->new (sub { |
153 | =cut |
152 | my $done = shift; |
|
|
153 | |
|
|
154 | my ($a, $b) = AnyEvent::Util::portable_socketpair () |
|
|
155 | or die; |
|
|
156 | |
|
|
157 | open my $oldin , "<&0" or die; |
|
|
158 | open my $oldout, ">&1" or die; |
|
|
159 | |
|
|
160 | open STDIN , "<&" . fileno $b or die; |
|
|
161 | open STDOUT, ">&" . fileno $b or die; |
|
|
162 | |
|
|
163 | Proc::FastSpawn::spawn ($program, \@argv); |
|
|
164 | |
|
|
165 | open STDIN , "<&" . fileno $oldin ; |
|
|
166 | open STDOUT, ">&" . fileno $oldout; |
|
|
167 | |
|
|
168 | $done->($a); |
|
|
169 | }) |
|
|
170 | } |
|
|
171 | |
154 | |
172 | sub new { |
155 | sub new { |
173 | my ($class, $create) = @_; |
156 | my ($class, $create) = @_; |
174 | |
157 | |
175 | bless [ |
158 | bless [ |
… | |
… | |
177 | "", |
160 | "", |
178 | [], |
161 | [], |
179 | ], $class |
162 | ], $class |
180 | } |
163 | } |
181 | |
164 | |
|
|
165 | sub new_from_fh { |
|
|
166 | my ($class, @fh) = @_; |
|
|
167 | |
|
|
168 | $class->new (sub { |
|
|
169 | shift @fh |
|
|
170 | or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork"; |
|
|
171 | }); |
|
|
172 | } |
|
|
173 | |
|
|
174 | sub new_exec { |
|
|
175 | my ($class, $program, @argv) = @_; |
|
|
176 | |
|
|
177 | require AnyEvent::Util; |
|
|
178 | require Proc::FastSpawn; |
|
|
179 | |
|
|
180 | $class->new (sub { |
|
|
181 | my $done = shift; |
|
|
182 | |
|
|
183 | my ($a, $b) = AnyEvent::Util::portable_socketpair () |
|
|
184 | or die; |
|
|
185 | |
|
|
186 | open my $oldin , "<&0" or die; |
|
|
187 | open my $oldout, ">&1" or die; |
|
|
188 | |
|
|
189 | open STDIN , "<&" . fileno $b or die; |
|
|
190 | open STDOUT, ">&" . fileno $b or die; |
|
|
191 | |
|
|
192 | Proc::FastSpawn::spawn ($program, \@argv); |
|
|
193 | |
|
|
194 | open STDIN , "<&" . fileno $oldin ; |
|
|
195 | open STDOUT, ">&" . fileno $oldout; |
|
|
196 | |
|
|
197 | $done->($a); |
|
|
198 | }) |
|
|
199 | } |
|
|
200 | |
182 | =item $new_proc = $proc->fork |
201 | =item $new_proc = $proc->fork |
183 | |
202 | |
184 | Quite the same as the same method of L<AnyEvent::Fork>, except that it |
203 | Quite the same as the same method of L<AnyEvent::Fork>, except that it |
185 | simply clones the object without creating an actual process. |
204 | simply clones the object without creating an actual process. |
186 | |
205 | |
… | |
… | |
212 | Not supported and always croaks. |
231 | Not supported and always croaks. |
213 | |
232 | |
214 | =cut |
233 | =cut |
215 | |
234 | |
216 | sub send_fh { |
235 | sub send_fh { |
217 | require Carp; |
|
|
218 | Carp::croak ("send_fh is not supported on AnyEvent::Fork::Remote objects"); |
236 | Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects"; |
219 | } |
237 | } |
220 | |
238 | |
221 | =item $proc = $proc->eval ($perlcode, @args) |
239 | =item $proc = $proc->eval ($perlcode, @args) |
222 | |
240 | |
223 | Quite the same as the same method of L<AnyEvent::Fork>. |
241 | Quite the same as the same method of L<AnyEvent::Fork>. |
… | |
… | |
241 | } |
259 | } |
242 | |
260 | |
243 | sub eval { |
261 | sub eval { |
244 | my ($self, $perlcode, @args) = @_; |
262 | my ($self, $perlcode, @args) = @_; |
245 | |
263 | |
|
|
264 | my $linecode = $perlcode; |
|
|
265 | $linecode =~ s/\s+/ /g; # takes care of \n |
|
|
266 | $linecode =~ s/"/''/g; |
|
|
267 | substr $linecode, 70, length $linecode, "..." if length $linecode > 70; |
|
|
268 | |
246 | $self->[1] .= '{ local @_ = ' . (aq @args) . "; $perlcode }\n"; |
269 | $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n"; |
247 | } |
270 | } |
248 | |
271 | |
249 | =item $proc = $proc->require ($module, ...) |
272 | =item $proc = $proc->require ($module, ...) |
250 | |
273 | |
251 | Quite the same as the same method of L<AnyEvent::Fork>. |
274 | Quite the same as the same method of L<AnyEvent::Fork>. |
… | |
… | |
253 | =cut |
276 | =cut |
254 | |
277 | |
255 | sub require { |
278 | sub require { |
256 | my ($self, @modules) = @_; |
279 | my ($self, @modules) = @_; |
257 | |
280 | |
|
|
281 | $self->eval ("require $_") |
258 | s%::%/%g for @modules; |
282 | for @modules; |
259 | $self->eval ('require "$_.pm" for @_', @modules); |
|
|
260 | |
283 | |
261 | $self |
284 | $self |
262 | } |
285 | } |
263 | |
286 | |
264 | =item $proc = $proc->send_arg ($string, ...) |
287 | =item $proc = $proc->send_arg ($string, ...) |
… | |
… | |
277 | |
300 | |
278 | =item $proc->run ($func, $cb->($fh)) |
301 | =item $proc->run ($func, $cb->($fh)) |
279 | |
302 | |
280 | Very similar to the run method of L<AnyEvent::Fork>. |
303 | Very similar to the run method of L<AnyEvent::Fork>. |
281 | |
304 | |
282 | On the parent side, the API is identical. On the child side, the |
305 | On the parent side, the API is identical, except that a C<$cb> argument of |
283 | "communications socket" is in fact just C<*STDIN>, and typically can only |
306 | C<undef> instad of a valid file handle signals an error. |
284 | be read from. |
|
|
285 | |
307 | |
|
|
308 | On the child side, the "communications socket" is in fact just C<*STDIN>, |
|
|
309 | and typically can only be read from (this highly depends on how the |
|
|
310 | program is created - if you just run F<perl> locally, it will work for |
|
|
311 | both reading and writing, but commands such as F<rsh> or F<ssh> typically |
|
|
312 | only provide read-only handles for STDIN). |
|
|
313 | |
286 | If the run function wants to read data that is written to C<$fh> in the |
314 | To be portable, if the run function wants to read data that is written to |
287 | parent, then it should read from STDIN. If the run function wants to |
315 | C<$fh> in the parent, then it should read from STDIN. If the run function |
288 | provide data that can later be read from C<$fh>, then it should write them |
316 | wants to provide data that can later be read from C<$fh>, then it should |
289 | to STDOUT. |
317 | write them to STDOUT. |
290 | |
318 | |
291 | You can write a run function that works with both L<AnyEvent::Fork> and |
319 | You can write a run function that works with both L<AnyEvent::Fork> |
292 | this module by checking C<fileno $fh> in on the passed callback in the run |
320 | and this module by checking C<fileno $fh>. If it is C<0> (meaning |
293 | function: |
321 | it is STDIN), then you should use it for reading, and STDOUT for |
|
|
322 | writing. Otherwise, you should use the file handle for both: |
294 | |
323 | |
295 | sub run { |
324 | sub run { |
296 | my ($rfh, ...) = @_; |
325 | my ($rfh, ...) = @_; |
297 | my $wfh = fileno $rfh ? $rfh : *STDOUT; |
326 | my $wfh = fileno $rfh ? $rfh : *STDOUT; |
298 | |
327 | |
… | |
… | |
302 | =cut |
331 | =cut |
303 | |
332 | |
304 | sub run { |
333 | sub run { |
305 | my ($self, $func, $cb) = @_; |
334 | my ($self, $func, $cb) = @_; |
306 | |
335 | |
307 | my $code = 'BEGIN {' . $self->[1] . '}' |
|
|
308 | . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';' |
|
|
309 | . $func . (aq @{ $self->[2] }) . ';' |
|
|
310 | . "\n__END__\n"; |
|
|
311 | |
|
|
312 | $self->[0](sub { |
336 | $self->[0](sub { |
313 | my $fh = shift |
337 | my $fh = shift |
314 | or die "AnyEvent::Fork::Remote: create callback failed"; |
338 | or die "AnyEvent::Fork::Remote: create callback failed"; |
315 | |
339 | |
|
|
340 | my $code = 'BEGIN {' . $self->[1] . "}\n" |
|
|
341 | . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';' |
|
|
342 | . '{ sysread STDIN, my $dummy, 1 }' |
|
|
343 | . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';' |
|
|
344 | . "\n__END__\n"; |
316 | |
345 | |
|
|
346 | warn $code;#d# |
317 | |
347 | |
|
|
348 | AnyEvent::Util::fh_nonblocking $fh, 1; |
|
|
349 | |
|
|
350 | my ($rw, $ww); |
|
|
351 | |
|
|
352 | my $ofs; |
|
|
353 | |
|
|
354 | $ww = AE::io $fh, 1, sub { |
|
|
355 | my $len = syswrite $fh, $code, 1<<20, $ofs; |
|
|
356 | |
|
|
357 | if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) { |
|
|
358 | $ofs += $len; |
|
|
359 | undef $ww if $ofs >= length $code; |
|
|
360 | } else { |
|
|
361 | # error |
|
|
362 | ($ww, $rw) = (); $cb->(undef); |
|
|
363 | } |
|
|
364 | }; |
|
|
365 | |
|
|
366 | my $rbuf; |
|
|
367 | |
|
|
368 | $rw = AE::io $fh, 0, sub { |
|
|
369 | my $len = sysread $fh, $rbuf, 1<<10; |
|
|
370 | |
|
|
371 | if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) { |
|
|
372 | $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0; |
|
|
373 | |
|
|
374 | if ($rbuf eq ($magic0 ^ $magic1)) { |
|
|
375 | # all data was sent, magic was received - both |
|
|
376 | # directions should be "empty", and therefore |
|
|
377 | # the socket must accept at least a single octet, |
|
|
378 | # to signal the "child" to go on. |
|
|
379 | undef $rw; |
|
|
380 | die if $ww; # uh-oh |
|
|
381 | |
|
|
382 | syswrite $fh, "\n"; |
|
|
383 | $cb->($fh); |
|
|
384 | } |
|
|
385 | } else { |
|
|
386 | # error |
|
|
387 | ($ww, $rw) = (); $cb->(undef); |
|
|
388 | } |
|
|
389 | }; |
318 | }); |
390 | }); |
319 | } |
391 | } |
320 | |
|
|
321 | my $x = new_exec AnyEvent::Fork::Remote "/usr/bin/rsh", "rsh", "rain", "exec perl";#d# |
|
|
322 | $x->require ("Carp", "Storable");#d# |
|
|
323 | $x->send_arg (1, 2, 3);#d# |
|
|
324 | $x->eval ('sub run { die }');#d# |
|
|
325 | $x->run ("run", sub { |
|
|
326 | }); |
|
|
327 | |
|
|
328 | |
|
|
329 | =item my $proc = new_from_stdio $fh |
|
|
330 | |
|
|
331 | Creates an C<AnyEvent::Fork::Remote> object from a file handle. This file |
|
|
332 | handle must be connected to both STDIN and STDOUT of a F<perl> process. |
|
|
333 | |
|
|
334 | This form might be more convenient than C<new> or C<new_exec> when |
|
|
335 | creating an C<AnyEvent::Fork::Remote> object, but the resulting object |
|
|
336 | does not support C<fork>. |
|
|
337 | |
|
|
338 | #TODO: really implement? |
|
|
339 | |
392 | |
340 | =back |
393 | =back |
341 | |
394 | |
342 | =head1 SEE ALSO |
395 | =head1 SEE ALSO |
343 | |
396 | |