Revision: | 1.7 |
Committed: | Sat Aug 13 23:42:05 2011 UTC (12 years, 11 months ago) by root |
Content type: | text/plain |
Branch: | MAIN |
CVS Tags: | rel-6_5, rel-6_10, rel-6_09, rel-6_08, rel-6_07, rel-6_512, rel-6_513, rel-6_511, rel-6_514, rel-6_32, rel-6_33, rel-6_31, rel-6_36, rel-6_37, rel-6_38, rel-6_39, rel-6_23, rel-6_29, rel-6_28, rel-6_46, rel-6_45, rel-6_51, rel-6_52, rel-6_53, rel-6_54, rel-6_55, rel-6_56, rel-6_57, rel-6_43, rel-6_42, rel-6_41, rel-6_47, rel-6_44, rel-6_49, rel-6_48, HEAD |
Changes since 1.6: | +1 -1 lines |
Log Message: | *** empty log message *** |
# | Content |
---|---|
1 | package transferqueue; |
2 | |
3 | use Scalar::Util; |
4 | |
5 | my @reserve = ( |
6 | [ 1_200_000, 1], |
7 | [ 8_000_000, 1], |
8 | [ 75_000_000, 1], |
9 | ); |
10 | |
11 | sub new { |
12 | my $class = shift; |
13 | my $self = bless { |
14 | slots => $_[0], |
15 | lastspb => 0, |
16 | avgspb => 0, |
17 | }, $class; |
18 | $self->{reschedule} = AE::timer 10, 3, sub { $self->wake_next }; |
19 | $self; |
20 | } |
21 | |
22 | sub start_transfer { |
23 | my $self = shift; |
24 | my $size = $_[0]; |
25 | |
26 | my $transfer = bless { |
27 | queue => $self, |
28 | time => $::NOW, |
29 | size => $size, |
30 | coro => $Coro::current, |
31 | started => 0, |
32 | }, transfer::; |
33 | |
34 | push @{$self->{wait}}, $transfer; |
35 | |
36 | $self->wake_next; |
37 | |
38 | $transfer; |
39 | } |
40 | |
41 | sub sort { |
42 | my @queue = grep $_, @{$_[0]{wait}}; |
43 | |
44 | $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue; |
45 | |
46 | $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue]; |
47 | |
48 | Scalar::Util::weaken $_ for @{$_[0]{wait}}; |
49 | } |
50 | |
51 | sub wake_next { |
52 | my $self = shift; |
53 | |
54 | $self->sort; |
55 | |
56 | while (@{$self->{wait}}) { |
57 | my $size = $self->{wait}[0]{size}; |
58 | my $min = 0; |
59 | for (@reserve) { |
60 | last if $size <= $_->[0]; |
61 | $min += $_->[1]; |
62 | } |
63 | last unless $self->{slots} > $min; |
64 | my $transfer = shift @{$self->{wait}}; |
65 | $self->{lastspb} = $transfer->{spb}; |
66 | $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001; |
67 | $self->{started}++; |
68 | $transfer->wake; |
69 | last; |
70 | } |
71 | } |
72 | |
73 | sub force_wake_next { |
74 | my $self = shift; |
75 | |
76 | return unless @{ $self->{wait} }; |
77 | |
78 | $self->{slots} += $::MAX_TRANSFERS; |
79 | $self->wake_next; |
80 | $self->{slots} -= $::MAX_TRANSFERS; |
81 | |
82 | 1 |
83 | } |
84 | |
85 | sub waiters { |
86 | $_[0]->sort; |
87 | @{$_[0]{wait}}; |
88 | } |
89 | |
90 | package transfer; |
91 | |
92 | use Coro::Timer (); |
93 | |
94 | sub wake { |
95 | my $self = shift; |
96 | |
97 | $self->{alloc} = 1; |
98 | $self->{queue}{slots}--; |
99 | $self->{wake} and $self->{wake}->ready; |
100 | } |
101 | |
102 | sub try { |
103 | my $self = shift; |
104 | |
105 | $self->{alloc} || do { |
106 | my $timeout = Coro::Timer::timeout $_[0]; |
107 | local $self->{wake} = $self->{coro}; |
108 | |
109 | Coro::schedule; |
110 | |
111 | $self->{alloc}; |
112 | } |
113 | } |
114 | |
115 | sub DESTROY { |
116 | my $self = shift; |
117 | |
118 | if ($self->{alloc}) { |
119 | $self->{queue}{slots}++; |
120 | $self->{queue}->wake_next; |
121 | } |
122 | } |
123 | |
124 | 1; |
125 |