… | |
… | |
251 | |
251 | |
252 | package AnyEvent::Fork; |
252 | package AnyEvent::Fork; |
253 | |
253 | |
254 | use common::sense; |
254 | use common::sense; |
255 | |
255 | |
256 | use Socket (); |
256 | use Errno (); |
257 | |
257 | |
258 | use AnyEvent; |
258 | use AnyEvent; |
259 | use AnyEvent::Util (); |
259 | use AnyEvent::Util (); |
260 | |
260 | |
261 | use IO::FDPass; |
261 | use IO::FDPass; |
… | |
… | |
281 | our $TEMPLATE; |
281 | our $TEMPLATE; |
282 | |
282 | |
283 | sub _cmd { |
283 | sub _cmd { |
284 | my $self = shift; |
284 | my $self = shift; |
285 | |
285 | |
286 | #TODO: maybe append the packet to any existing string command already in the queue |
|
|
287 | |
|
|
288 | # ideally, we would want to use "a (w/a)*" as format string, but perl versions |
286 | # ideally, we would want to use "a (w/a)*" as format string, but perl |
289 | # from at least 5.8.9 to 5.16.3 are all buggy and can't unpack it. |
287 | # versions from at least 5.8.9 to 5.16.3 are all buggy and can't unpack |
|
|
288 | # it. |
290 | push @{ $self->[2] }, pack "L/a*", pack "(w/a*)*", @_; |
289 | push @{ $self->[2] }, pack "L/a*", pack "(w/a*)*", @_; |
291 | |
290 | |
292 | $self->[3] ||= AE::io $self->[1], 1, sub { |
291 | unless ($self->[3]) { |
|
|
292 | my $wcb = sub { |
|
|
293 | do { |
293 | # send the next "thing" in the queue - either a reference to an fh, |
294 | # send the next "thing" in the queue - either a reference to an fh, |
294 | # or a plain string. |
295 | # or a plain string. |
295 | |
296 | |
296 | if (ref $self->[2][0]) { |
297 | if (ref $self->[2][0]) { |
297 | # send fh |
298 | # send fh |
298 | IO::FDPass::send fileno $self->[1], fileno ${ $self->[2][0] } |
299 | unless (IO::FDPass::send fileno $self->[1], fileno ${ $self->[2][0] }) { |
|
|
300 | return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK; |
|
|
301 | undef $self->[3]; |
|
|
302 | die "AnyEvent::Fork: file descriptor send failure: $!"; |
|
|
303 | } |
|
|
304 | |
299 | and shift @{ $self->[2] }; |
305 | shift @{ $self->[2] }; |
300 | |
306 | |
301 | } else { |
307 | } else { |
302 | # send string |
308 | # send string |
303 | my $len = syswrite $self->[1], $self->[2][0] |
309 | my $len = syswrite $self->[1], $self->[2][0]; |
|
|
310 | |
|
|
311 | unless ($len) { |
|
|
312 | return if $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK; |
|
|
313 | undef $self->[3]; |
304 | or do { undef $self->[3]; die "AnyEvent::Fork: command write failure: $!" }; |
314 | die "AnyEvent::Fork: command write failure: $!"; |
|
|
315 | } |
305 | |
316 | |
306 | substr $self->[2][0], 0, $len, ""; |
317 | substr $self->[2][0], 0, $len, ""; |
307 | shift @{ $self->[2] } unless length $self->[2][0]; |
318 | shift @{ $self->[2] } unless length $self->[2][0]; |
308 | } |
319 | } |
|
|
320 | } while @{ $self->[2] }; |
309 | |
321 | |
310 | unless (@{ $self->[2] }) { |
322 | # everything written |
311 | undef $self->[3]; |
323 | undef $self->[3]; |
312 | # invoke run callback |
324 | # invoke run callback |
313 | $self->[0]->($self->[1]) if $self->[0]; |
325 | $self->[0]->($self->[1]) if $self->[0]; |
314 | } |
326 | }; |
|
|
327 | |
|
|
328 | $wcb->(); |
|
|
329 | |
|
|
330 | $self->[3] ||= AE::io $self->[1], 1, $wcb |
|
|
331 | if @{ $self->[2] }; |
315 | }; |
332 | } |
316 | |
333 | |
317 | () # make sure we don't leak the watcher |
334 | () # make sure we don't leak the watcher |
318 | } |
335 | } |
319 | |
336 | |
320 | sub _new { |
337 | sub _new { |
… | |
… | |
539 | =item $proc = $proc->send_arg ($string, ...) |
556 | =item $proc = $proc->send_arg ($string, ...) |
540 | |
557 | |
541 | Send one or more argument strings to the process, to prepare a call to |
558 | Send one or more argument strings to the process, to prepare a call to |
542 | C<run>. The strings can be any octet string. |
559 | C<run>. The strings can be any octet string. |
543 | |
560 | |
|
|
561 | The protocol is optimised to pass a moderate number of relatively short |
|
|
562 | strings - while you can pass up to 4GB of data in one go, this is more |
|
|
563 | meant to pass some ID information or other startup info, not big chunks of |
|
|
564 | data. |
|
|
565 | |
544 | Returns the process object for easy chaining of method calls. |
566 | Returns the process object for easy chaining of method calls. |
545 | |
567 | |
546 | =cut |
568 | =cut |
547 | |
569 | |
548 | sub send_arg { |
570 | sub send_arg { |
… | |
… | |
621 | |
643 | |
622 | =head1 PERFORMANCE |
644 | =head1 PERFORMANCE |
623 | |
645 | |
624 | Now for some unscientific benchmark numbers (all done on an amd64 |
646 | Now for some unscientific benchmark numbers (all done on an amd64 |
625 | GNU/Linux box). These are intended to give you an idea of the relative |
647 | GNU/Linux box). These are intended to give you an idea of the relative |
626 | performance you can expect. |
648 | performance you can expect, they are not meant to be absolute performance |
|
|
649 | numbers. |
627 | |
650 | |
628 | OK, so, I ran a simple benchmark that creates a socket pair, forks, calls |
651 | OK, so, I ran a simple benchmark that creates a socket pair, forks, calls |
629 | exit in the child and waits for the socket to close in the parent. I did |
652 | exit in the child and waits for the socket to close in the parent. I did |
630 | load AnyEvent, EV and AnyEvent::Fork, for a total process size of 6312kB. |
653 | load AnyEvent, EV and AnyEvent::Fork, for a total process size of 5100kB. |
631 | |
654 | |
632 | 2079 new processes per second, using socketpair + fork manually |
655 | 2079 new processes per second, using manual socketpair + fork |
633 | |
656 | |
634 | Then I did the same thing, but instead of calling fork, I called |
657 | Then I did the same thing, but instead of calling fork, I called |
635 | AnyEvent::Fork->new->run ("CORE::exit") and then again waited for the |
658 | AnyEvent::Fork->new->run ("CORE::exit") and then again waited for the |
636 | socket form the child to close on exit. This does the same thing as manual |
659 | socket form the child to close on exit. This does the same thing as manual |
637 | socket pair + fork, except that what is forked is the template process |
660 | socket pair + fork, except that what is forked is the template process |