… | |
… | |
42 | my @pool; |
42 | my @pool; |
43 | |
43 | |
44 | # one "execution thread" |
44 | # one "execution thread" |
45 | sub handler { |
45 | sub handler { |
46 | while () { |
46 | while () { |
47 | my $new = pop @newcons; |
|
|
48 | if ($new) { |
47 | if (@newcons) { |
49 | eval { |
48 | eval { |
50 | conn->new(@$new)->handle; |
49 | conn->new(@{pop @newcons})->handle; |
51 | }; |
50 | }; |
52 | slog 1, "$@" if $@ && !ref $@; |
51 | slog 1, "$@" if $@ && !ref $@; |
53 | $connections->up; |
52 | $connections->up; |
54 | } else { |
53 | } else { |
55 | last if @pool >= $MAX_POOL; |
54 | last if @pool >= $MAX_POOL; |
… | |
… | |
99 | use Convert::Scalar 'weaken'; |
98 | use Convert::Scalar 'weaken'; |
100 | use Linux::AIO; |
99 | use Linux::AIO; |
101 | |
100 | |
102 | Linux::AIO::min_parallel $::AIO_PARALLEL; |
101 | Linux::AIO::min_parallel $::AIO_PARALLEL; |
103 | |
102 | |
104 | my $aio_requests = new Coro::Semaphore $::AIO_PARALLEL * 4; |
|
|
105 | |
|
|
106 | Event->io(fd => Linux::AIO::poll_fileno, |
103 | Event->io(fd => Linux::AIO::poll_fileno, |
107 | poll => 'r', async => 1, |
104 | poll => 'r', async => 1, |
108 | cb => \&Linux::AIO::poll_cb); |
105 | cb => \&Linux::AIO::poll_cb); |
109 | |
106 | |
110 | our %conn; # $conn{ip}{self} => connobj |
107 | our %conn; # $conn{ip}{self} => connobj |
… | |
… | |
143 | $self; |
140 | $self; |
144 | } |
141 | } |
145 | |
142 | |
146 | sub DESTROY { |
143 | sub DESTROY { |
147 | my $self = shift; |
144 | my $self = shift; |
148 | |
|
|
149 | $::conns--; |
145 | $::conns--; |
150 | |
|
|
151 | $self->eoconn; |
146 | $self->eoconn; |
152 | } |
147 | } |
153 | |
148 | |
154 | # end of connection |
149 | # end of connection |
155 | sub eoconn { |
150 | sub eoconn { |
… | |
… | |
519 | while ($h > 0) { |
514 | while ($h > 0) { |
520 | if (0) { |
515 | if (0) { |
521 | sysread $fh, $buf, $h > $::BUFSIZE ? $::BUFSIZE : $h |
516 | sysread $fh, $buf, $h > $::BUFSIZE ? $::BUFSIZE : $h |
522 | or last; |
517 | or last; |
523 | } else { |
518 | } else { |
524 | undef $buf; |
|
|
525 | $aio_requests->down; |
|
|
526 | aio_read($fh, $l, ($h > $::BUFSIZE ? $::BUFSIZE : $h), |
519 | aio_read($fh, $l, ($h > $::BUFSIZE ? $::BUFSIZE : $h), |
527 | $buf, 0, sub { |
520 | $buf, 0, sub { |
528 | $r = $_[0]; |
521 | $r = $_[0]; |
529 | $current->ready; |
522 | Coro::ready($current); |
530 | }); |
523 | }); |
531 | &Coro::schedule; |
524 | &Coro::schedule; |
532 | $aio_requests->up; |
|
|
533 | last unless $r; |
525 | last unless $r; |
534 | } |
526 | } |
535 | my $w = $self->{fh}->syswrite($buf) |
527 | my $w = syswrite $self->{fh}, $buf |
536 | or last; |
528 | or last; |
537 | $::written += $w; |
529 | $::written += $w; |
538 | $self->{written} += $w; |
530 | $self->{written} += $w; |
539 | $l += $r; |
531 | $l += $r; |
540 | } |
532 | } |