Revision: | 1.7 |
Committed: | Sat Aug 13 23:42:05 2011 UTC (12 years, 11 months ago) by root |
Content type: | text/plain |
Branch: | MAIN |
CVS Tags: | rel-6_5, rel-6_10, rel-6_09, rel-6_08, rel-6_07, rel-6_512, rel-6_513, rel-6_511, rel-6_514, rel-6_32, rel-6_33, rel-6_31, rel-6_36, rel-6_37, rel-6_38, rel-6_39, rel-6_23, rel-6_29, rel-6_28, rel-6_46, rel-6_45, rel-6_51, rel-6_52, rel-6_53, rel-6_54, rel-6_55, rel-6_56, rel-6_57, rel-6_43, rel-6_42, rel-6_41, rel-6_47, rel-6_44, rel-6_49, rel-6_48, HEAD |
Changes since 1.6: | +1 -1 lines |
Log Message: | *** empty log message *** |
# | User | Rev | Content |
---|---|---|---|
1 | root | 1.1 | package transferqueue; |
2 | |||
3 | use Scalar::Util; | ||
4 | |||
5 | my @reserve = ( | ||
6 | [ 1_200_000, 1], | ||
7 | [ 8_000_000, 1], | ||
8 | [ 75_000_000, 1], | ||
9 | ); | ||
10 | |||
11 | sub new { | ||
12 | my $class = shift; | ||
13 | my $self = bless { | ||
14 | slots => $_[0], | ||
15 | lastspb => 0, | ||
16 | avgspb => 0, | ||
17 | }, $class; | ||
18 | root | 1.7 | $self->{reschedule} = AE::timer 10, 3, sub { $self->wake_next }; |
19 | root | 1.1 | $self; |
20 | } | ||
21 | |||
22 | sub start_transfer { | ||
23 | my $self = shift; | ||
24 | my $size = $_[0]; | ||
25 | |||
26 | my $transfer = bless { | ||
27 | queue => $self, | ||
28 | time => $::NOW, | ||
29 | size => $size, | ||
30 | coro => $Coro::current, | ||
31 | started => 0, | ||
32 | }, transfer::; | ||
33 | |||
34 | push @{$self->{wait}}, $transfer; | ||
35 | |||
36 | $self->wake_next; | ||
37 | |||
38 | $transfer; | ||
39 | } | ||
40 | |||
41 | sub sort { | ||
42 | my @queue = grep $_, @{$_[0]{wait}}; | ||
43 | |||
44 | $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue; | ||
45 | |||
46 | $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue]; | ||
47 | |||
48 | Scalar::Util::weaken $_ for @{$_[0]{wait}}; | ||
49 | } | ||
50 | |||
51 | sub wake_next { | ||
52 | my $self = shift; | ||
53 | |||
54 | $self->sort; | ||
55 | |||
56 | while (@{$self->{wait}}) { | ||
57 | my $size = $self->{wait}[0]{size}; | ||
58 | my $min = 0; | ||
59 | for (@reserve) { | ||
60 | last if $size <= $_->[0]; | ||
61 | $min += $_->[1]; | ||
62 | } | ||
63 | last unless $self->{slots} > $min; | ||
64 | my $transfer = shift @{$self->{wait}}; | ||
65 | $self->{lastspb} = $transfer->{spb}; | ||
66 | root | 1.4 | $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001; |
67 | root | 1.1 | $self->{started}++; |
68 | $transfer->wake; | ||
69 | last; | ||
70 | } | ||
71 | } | ||
72 | |||
73 | root | 1.2 | sub force_wake_next { |
74 | my $self = shift; | ||
75 | |||
76 | root | 1.6 | return unless @{ $self->{wait} }; |
77 | |||
78 | root | 1.3 | $self->{slots} += $::MAX_TRANSFERS; |
79 | root | 1.2 | $self->wake_next; |
80 | root | 1.3 | $self->{slots} -= $::MAX_TRANSFERS; |
81 | root | 1.6 | |
82 | 1 | ||
83 | root | 1.2 | } |
84 | |||
85 | root | 1.1 | sub waiters { |
86 | $_[0]->sort; | ||
87 | @{$_[0]{wait}}; | ||
88 | } | ||
89 | |||
90 | package transfer; | ||
91 | |||
92 | use Coro::Timer (); | ||
93 | |||
94 | sub wake { | ||
95 | my $self = shift; | ||
96 | |||
97 | $self->{alloc} = 1; | ||
98 | $self->{queue}{slots}--; | ||
99 | $self->{wake} and $self->{wake}->ready; | ||
100 | } | ||
101 | |||
102 | sub try { | ||
103 | my $self = shift; | ||
104 | |||
105 | $self->{alloc} || do { | ||
106 | my $timeout = Coro::Timer::timeout $_[0]; | ||
107 | local $self->{wake} = $self->{coro}; | ||
108 | |||
109 | Coro::schedule; | ||
110 | |||
111 | $self->{alloc}; | ||
112 | } | ||
113 | } | ||
114 | |||
115 | sub DESTROY { | ||
116 | my $self = shift; | ||
117 | |||
118 | if ($self->{alloc}) { | ||
119 | $self->{queue}{slots}++; | ||
120 | $self->{queue}->wake_next; | ||
121 | } | ||
122 | } | ||
123 | |||
124 | 1; | ||
125 |