Revision: | 1.5 |
Committed: | Sat Dec 8 21:01:17 2007 UTC (16 years, 7 months ago) by root |
Content type: | text/plain |
Branch: | MAIN |
CVS Tags: | rel-4_91, rel-5_151, rel-5_1, rel-5_0, rel-4_748, rel-4_8, rel-4_9, rel-4_741, rel-4_743, rel-4_742, rel-4_744, rel-4_747, rel-6_13, rel-5_161, rel-4_74, rel-4_71, rel-4_72, rel-4_73, rel-5_162, rel-5_2, rel-4_802, rel-4_803, rel-4_801, rel-4_804, rel-4_479, rel-4_50, rel-4_51, rel-4_4, rel-4_45, rel-4_745, rel-4_901, rel-4_49, rel-4_48, rel-4_746, rel-5_11, rel-5_12, rel-5_15, rel-5_14, rel-5_17, rel-5_16, rel-4_47, rel-4_46, rel-4_7, rel-5_132, rel-5_131, rel-4_911, rel-4_912, rel-4_32, rel-4_33, rel-4_34, rel-4_35, rel-4_36, rel-4_37 |
Changes since 1.4: | +1 -11 lines |
Log Message: | convert from Event to EV |
# | Content |
---|---|
1 | package transferqueue; |
2 | |
3 | use Scalar::Util; |
4 | |
5 | my @reserve = ( |
6 | [ 1_200_000, 1], |
7 | [ 8_000_000, 1], |
8 | [ 75_000_000, 1], |
9 | ); |
10 | |
11 | sub new { |
12 | my $class = shift; |
13 | my $self = bless { |
14 | slots => $_[0], |
15 | lastspb => 0, |
16 | avgspb => 0, |
17 | }, $class; |
18 | $self->{reschedule} = EV::timer 10, 3, sub { $self->wake_next }; |
19 | $self; |
20 | } |
21 | |
22 | sub start_transfer { |
23 | my $self = shift; |
24 | my $size = $_[0]; |
25 | |
26 | my $transfer = bless { |
27 | queue => $self, |
28 | time => $::NOW, |
29 | size => $size, |
30 | coro => $Coro::current, |
31 | started => 0, |
32 | }, transfer::; |
33 | |
34 | push @{$self->{wait}}, $transfer; |
35 | |
36 | $self->wake_next; |
37 | |
38 | $transfer; |
39 | } |
40 | |
41 | sub sort { |
42 | my @queue = grep $_, @{$_[0]{wait}}; |
43 | |
44 | $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue; |
45 | |
46 | $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue]; |
47 | |
48 | Scalar::Util::weaken $_ for @{$_[0]{wait}}; |
49 | } |
50 | |
51 | sub wake_next { |
52 | my $self = shift; |
53 | |
54 | $self->sort; |
55 | |
56 | while (@{$self->{wait}}) { |
57 | my $size = $self->{wait}[0]{size}; |
58 | my $min = 0; |
59 | for (@reserve) { |
60 | last if $size <= $_->[0]; |
61 | $min += $_->[1]; |
62 | } |
63 | last unless $self->{slots} > $min; |
64 | my $transfer = shift @{$self->{wait}}; |
65 | $self->{lastspb} = $transfer->{spb}; |
66 | $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001; |
67 | $self->{started}++; |
68 | $transfer->wake; |
69 | last; |
70 | } |
71 | } |
72 | |
73 | sub force_wake_next { |
74 | my $self = shift; |
75 | |
76 | $self->{slots} += $::MAX_TRANSFERS; |
77 | $self->wake_next; |
78 | $self->{slots} -= $::MAX_TRANSFERS; |
79 | } |
80 | |
81 | sub waiters { |
82 | $_[0]->sort; |
83 | @{$_[0]{wait}}; |
84 | } |
85 | |
86 | package transfer; |
87 | |
88 | use Coro::Timer (); |
89 | |
90 | sub wake { |
91 | my $self = shift; |
92 | |
93 | $self->{alloc} = 1; |
94 | $self->{queue}{slots}--; |
95 | $self->{wake} and $self->{wake}->ready; |
96 | } |
97 | |
98 | sub try { |
99 | my $self = shift; |
100 | |
101 | $self->{alloc} || do { |
102 | my $timeout = Coro::Timer::timeout $_[0]; |
103 | local $self->{wake} = $self->{coro}; |
104 | |
105 | Coro::schedule; |
106 | |
107 | $self->{alloc}; |
108 | } |
109 | } |
110 | |
111 | sub DESTROY { |
112 | my $self = shift; |
113 | |
114 | if ($self->{alloc}) { |
115 | $self->{queue}{slots}++; |
116 | $self->{queue}->wake_next; |
117 | } |
118 | } |
119 | |
120 | 1; |
121 |