|
|
1 | use AnyEvent (); |
|
|
2 | |
1 | use Coro; |
3 | use Coro; |
2 | use Coro::Semaphore; |
4 | use Coro::Semaphore; |
3 | use Coro::SemaphoreSet; |
5 | use Coro::SemaphoreSet; |
4 | use Coro::EV; |
6 | use Coro::EV; |
5 | use Coro::Socket; |
7 | use Coro::Socket; |
… | |
… | |
52 | } |
54 | } |
53 | |
55 | |
54 | sub slog { |
56 | sub slog { |
55 | my $level = shift; |
57 | my $level = shift; |
56 | my $format = shift; |
58 | my $format = shift; |
|
|
59 | |
|
|
60 | $format = sprintf $format, @_ if @_; |
|
|
61 | |
57 | my $NOW = (POSIX::strftime "%Y-%m-%d %H:%M:%S", gmtime $::NOW); |
62 | my $NOW = (POSIX::strftime "%Y-%m-%d %H:%M:%S", gmtime $::NOW); |
58 | printf "$NOW: $format\n", @_; |
63 | print "$NOW: $format\n"; |
59 | printf $errorlog "$NOW: $format\n", @_ if $errorlog; |
64 | print $errorlog "$NOW: $format\n", @_ if $errorlog; |
60 | } |
65 | } |
61 | |
66 | |
62 | our $connections = new Coro::Semaphore $::MAX_CONNECTS || 250; |
67 | our $connections = new Coro::Semaphore $::MAX_CONNECTS || 250; |
63 | our $httpevent = new Coro::Signal; |
68 | our $httpevent = new Coro::Signal; |
64 | |
69 | |
… | |
… | |
73 | sub unused_bandwidth { |
78 | sub unused_bandwidth { |
74 | $unused_bytes += $_[0]; |
79 | $unused_bytes += $_[0]; |
75 | if ($unused_last < $NOW - 30 && $unused_bytes / ($NOW - $unused_last) > 50000) { |
80 | if ($unused_last < $NOW - 30 && $unused_bytes / ($NOW - $unused_last) > 50000) { |
76 | $unused_last = $NOW; |
81 | $unused_last = $NOW; |
77 | $unused_bytes = 0; |
82 | $unused_bytes = 0; |
78 | $queue_file->force_wake_next; |
83 | $queue_file->force_wake_next |
79 | slog 1, "forced filetransfer due to unused bandwidth"; |
84 | and slog 1, "forced filetransfer due to unused bandwidth"; |
80 | } |
85 | } |
81 | } |
86 | } |
82 | |
87 | |
83 | sub listen_on { |
88 | sub listen_on { |
84 | my $listen = $_[0]; |
89 | my $listen = $_[0]; |
… | |
… | |
133 | |
138 | |
134 | use Socket; |
139 | use Socket; |
135 | use HTTP::Date; |
140 | use HTTP::Date; |
136 | use Convert::Scalar 'weaken'; |
141 | use Convert::Scalar 'weaken'; |
137 | use IO::AIO; |
142 | use IO::AIO; |
|
|
143 | use AnyEvent::AIO; |
138 | |
144 | |
139 | IO::AIO::min_parallel $::AIO_PARALLEL; |
145 | IO::AIO::min_parallel $::AIO_PARALLEL; |
140 | |
|
|
141 | our $AIO_WATCHER = EV::io IO::AIO::poll_fileno, EV::READ, \&IO::AIO::poll_cb; |
|
|
142 | |
146 | |
143 | our %conn; # $conn{ip}{self} => connobj |
147 | our %conn; # $conn{ip}{self} => connobj |
144 | our %uri; # $uri{ip}{uri}{self} |
148 | our %uri; # $uri{ip}{uri}{self} |
145 | our %blocked; |
149 | our %blocked; |
146 | our %mimetype; |
150 | our %mimetype; |
… | |
… | |
206 | for (keys %blocked) { |
210 | for (keys %blocked) { |
207 | delete $blocked{$_} unless $blocked{$_}[0] > $::NOW; |
211 | delete $blocked{$_} unless $blocked{$_}[0] > $::NOW; |
208 | } |
212 | } |
209 | } |
213 | } |
210 | |
214 | |
211 | our $PRUNE_WATCHER = EV::timer 60, 60, \&prune_caches; |
215 | our $PRUNE_WATCHER = AE::timer 60, 60, \&prune_caches; |
212 | |
216 | |
213 | sub slog { |
217 | sub slog { |
214 | my $self = shift; |
218 | my $self = shift; |
215 | main::slog($_[0], "$self->{remote_id}> $_[1]"); |
219 | main::slog($_[0], "$self->{remote_id}> $_[1]"); |
216 | } |
220 | } |
… | |
… | |
559 | $self->err (416, "not satisfiable", $hdr, ""); |
563 | $self->err (416, "not satisfiable", $hdr, ""); |
560 | |
564 | |
561 | satisfiable: |
565 | satisfiable: |
562 | # check for segmented downloads |
566 | # check for segmented downloads |
563 | if ($l && $::NO_SEGMENTED) { |
567 | if ($l && $::NO_SEGMENTED) { |
564 | my $timeout = $::NOW + 15; |
568 | my $timeout = $::NOW + 60; |
565 | while (keys %{$uri{$self->{remote_id}}{$self->{uri}}} > 1) { |
569 | while (keys %{$uri{$self->{remote_id}}{$self->{uri}}} > 1) { |
566 | if ($timeout <= $::NOW) { |
570 | if ($timeout <= $::NOW) { |
567 | $self->block ($::BLOCKTIME, "segmented downloads are forbidden"); |
|
|
568 | #$self->err_segmented_download; |
571 | $self->err_segmented_download; |
569 | } else { |
572 | } else { |
570 | $httpevent->wait; |
573 | $httpevent->wait; |
571 | } |
574 | } |
572 | } |
575 | } |
573 | } |
576 | } |
… | |
… | |
618 | } |
621 | } |
619 | |
622 | |
620 | Coro::AIO::aio_read $fh, $l, ($h > $bufsize ? $bufsize : $h), my $buf, 0 |
623 | Coro::AIO::aio_read $fh, $l, ($h > $bufsize ? $bufsize : $h), my $buf, 0 |
621 | or last; |
624 | or last; |
622 | |
625 | |
|
|
626 | # readahead to work around rijk disk issues |
|
|
627 | IO::AIO::aio_readahead $fh, $l + $bufsize, $bufsize; |
|
|
628 | |
623 | $tbf->request (length $buf); |
629 | $tbf->request (length $buf); |
624 | my $w = $self->{fh}->syswrite ($buf) |
630 | my $w = $self->{fh}->syswrite ($buf) |
625 | or last; |
631 | or last; |
626 | $::written += $w; |
632 | $::written += $w; |
627 | $self->{written} += $w; |
633 | $self->{written} += $w; |