Revision: | 1.4 |
Committed: | Sun Dec 4 19:36:28 2005 UTC (18 years, 8 months ago) by root |
Content type: | text/plain |
Branch: | MAIN |
CVS Tags: | rel-2_5, rel-4_22, rel-4_21, rel-4_0, rel-4_3, rel-3_41, rel-4_13, rel-4_11, rel-3_55, rel-3_51, rel-4_01, rel-4_03, rel-4_02, rel-2_0, rel-2_1, rel-1_9, rel-3_6, rel-3_62, rel-3_63, rel-3_61, rel-1_7, rel-1_6, rel-3_4, rel-3_1, rel-3_5, rel-3_3, rel-3_2, rel-3_0, rel-3_01, rel-3_11, rel-4_1, rel-4_2, stack_sharing, rel-3_501, rel-4_31 |
Changes since 1.3: | +1 -1 lines |
Log Message: | *** empty log message *** |
# | Content |
---|---|
1 | package transferqueue; |
2 | |
3 | use Scalar::Util; |
4 | |
5 | my @reserve = ( |
6 | [ 1_200_000, 1], |
7 | [ 8_000_000, 1], |
8 | [ 75_000_000, 1], |
9 | ); |
10 | |
11 | sub new { |
12 | my $class = shift; |
13 | my $self = bless { |
14 | slots => $_[0], |
15 | lastspb => 0, |
16 | avgspb => 0, |
17 | }, $class; |
18 | $self->{reschedule} = Event->timer( |
19 | after => 10, |
20 | interval => 3, |
21 | cb => sub { $self->wake_next }, |
22 | ); |
23 | $self; |
24 | } |
25 | |
26 | sub start_transfer { |
27 | my $self = shift; |
28 | my $size = $_[0]; |
29 | |
30 | my $transfer = bless { |
31 | queue => $self, |
32 | time => $::NOW, |
33 | size => $size, |
34 | coro => $Coro::current, |
35 | started => 0, |
36 | }, transfer::; |
37 | |
38 | push @{$self->{wait}}, $transfer; |
39 | |
40 | $self->wake_next; |
41 | |
42 | $transfer; |
43 | } |
44 | |
45 | sub sort { |
46 | my @queue = grep $_, @{$_[0]{wait}}; |
47 | |
48 | $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue; |
49 | |
50 | $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue]; |
51 | |
52 | Scalar::Util::weaken $_ for @{$_[0]{wait}}; |
53 | } |
54 | |
55 | sub wake_next { |
56 | my $self = shift; |
57 | |
58 | $self->sort; |
59 | |
60 | while (@{$self->{wait}}) { |
61 | my $size = $self->{wait}[0]{size}; |
62 | my $min = 0; |
63 | for (@reserve) { |
64 | last if $size <= $_->[0]; |
65 | $min += $_->[1]; |
66 | } |
67 | last unless $self->{slots} > $min; |
68 | my $transfer = shift @{$self->{wait}}; |
69 | $self->{lastspb} = $transfer->{spb}; |
70 | $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001; |
71 | $self->{started}++; |
72 | $transfer->wake; |
73 | last; |
74 | } |
75 | } |
76 | |
77 | sub force_wake_next { |
78 | my $self = shift; |
79 | |
80 | $self->{slots} += $::MAX_TRANSFERS; |
81 | $self->wake_next; |
82 | $self->{slots} -= $::MAX_TRANSFERS; |
83 | } |
84 | |
85 | sub waiters { |
86 | $_[0]->sort; |
87 | @{$_[0]{wait}}; |
88 | } |
89 | |
90 | sub DESTROY { |
91 | my $self = shift; |
92 | |
93 | $self->{reschedule}->cancel; |
94 | } |
95 | |
96 | package transfer; |
97 | |
98 | use Coro::Timer (); |
99 | |
100 | sub wake { |
101 | my $self = shift; |
102 | |
103 | $self->{alloc} = 1; |
104 | $self->{queue}{slots}--; |
105 | $self->{wake} and $self->{wake}->ready; |
106 | } |
107 | |
108 | sub try { |
109 | my $self = shift; |
110 | |
111 | $self->{alloc} || do { |
112 | my $timeout = Coro::Timer::timeout $_[0]; |
113 | local $self->{wake} = $self->{coro}; |
114 | |
115 | Coro::schedule; |
116 | |
117 | $self->{alloc}; |
118 | } |
119 | } |
120 | |
121 | sub DESTROY { |
122 | my $self = shift; |
123 | |
124 | if ($self->{alloc}) { |
125 | $self->{queue}{slots}++; |
126 | $self->{queue}->wake_next; |
127 | } |
128 | } |
129 | |
130 | 1; |
131 |