… | |
… | |
99 | use Convert::Scalar 'weaken'; |
99 | use Convert::Scalar 'weaken'; |
100 | use Linux::AIO; |
100 | use Linux::AIO; |
101 | |
101 | |
102 | Linux::AIO::min_parallel $::AIO_PARALLEL; |
102 | Linux::AIO::min_parallel $::AIO_PARALLEL; |
103 | |
103 | |
104 | my $aio_requests = new Coro::Semaphore $::AIO_PARALLEL * 4; |
|
|
105 | |
|
|
106 | Event->io(fd => Linux::AIO::poll_fileno, |
104 | Event->io(fd => Linux::AIO::poll_fileno, |
107 | poll => 'r', async => 1, |
105 | poll => 'r', async => 1, |
108 | cb => \&Linux::AIO::poll_cb); |
106 | cb => \&Linux::AIO::poll_cb); |
109 | |
107 | |
110 | our %conn; # $conn{ip}{self} => connobj |
108 | our %conn; # $conn{ip}{self} => connobj |
… | |
… | |
519 | while ($h > 0) { |
517 | while ($h > 0) { |
520 | if (0) { |
518 | if (0) { |
521 | sysread $fh, $buf, $h > $::BUFSIZE ? $::BUFSIZE : $h |
519 | sysread $fh, $buf, $h > $::BUFSIZE ? $::BUFSIZE : $h |
522 | or last; |
520 | or last; |
523 | } else { |
521 | } else { |
524 | undef $buf; |
|
|
525 | $aio_requests->down; |
|
|
526 | aio_read($fh, $l, ($h > $::BUFSIZE ? $::BUFSIZE : $h), |
522 | aio_read($fh, $l, ($h > $::BUFSIZE ? $::BUFSIZE : $h), |
527 | $buf, 0, sub { |
523 | $buf, 0, sub { |
528 | $r = $_[0]; |
524 | $r = $_[0]; |
529 | $current->ready; |
525 | Coro::ready($current); |
530 | }); |
526 | }); |
531 | &Coro::schedule; |
527 | &Coro::schedule; |
532 | $aio_requests->up; |
|
|
533 | last unless $r; |
528 | last unless $r; |
534 | } |
529 | } |
535 | my $w = $self->{fh}->syswrite($buf) |
530 | my $w = syswrite $self->{fh}, $buf |
536 | or last; |
531 | or last; |
537 | $::written += $w; |
532 | $::written += $w; |
538 | $self->{written} += $w; |
533 | $self->{written} += $w; |
539 | $l += $r; |
534 | $l += $r; |
540 | } |
535 | } |