ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/myhttpd/queue.pl
Revision: 1.4
Committed: Sun Dec 4 19:36:28 2005 UTC (18 years, 7 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-2_5, rel-4_22, rel-4_21, rel-4_0, rel-4_3, rel-3_41, rel-4_13, rel-4_11, rel-3_55, rel-3_51, rel-4_01, rel-4_03, rel-4_02, rel-2_0, rel-2_1, rel-1_9, rel-3_6, rel-3_62, rel-3_63, rel-3_61, rel-1_7, rel-1_6, rel-3_4, rel-3_1, rel-3_5, rel-3_3, rel-3_2, rel-3_0, rel-3_01, rel-3_11, rel-4_1, rel-4_2, stack_sharing, rel-3_501, rel-4_31
Changes since 1.3: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 package transferqueue;
2
3 use Scalar::Util;
4
5 my @reserve = (
6 [ 1_200_000, 1],
7 [ 8_000_000, 1],
8 [ 75_000_000, 1],
9 );
10
11 sub new {
12 my $class = shift;
13 my $self = bless {
14 slots => $_[0],
15 lastspb => 0,
16 avgspb => 0,
17 }, $class;
18 $self->{reschedule} = Event->timer(
19 after => 10,
20 interval => 3,
21 cb => sub { $self->wake_next },
22 );
23 $self;
24 }
25
26 sub start_transfer {
27 my $self = shift;
28 my $size = $_[0];
29
30 my $transfer = bless {
31 queue => $self,
32 time => $::NOW,
33 size => $size,
34 coro => $Coro::current,
35 started => 0,
36 }, transfer::;
37
38 push @{$self->{wait}}, $transfer;
39
40 $self->wake_next;
41
42 $transfer;
43 }
44
45 sub sort {
46 my @queue = grep $_, @{$_[0]{wait}};
47
48 $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue;
49
50 $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue];
51
52 Scalar::Util::weaken $_ for @{$_[0]{wait}};
53 }
54
55 sub wake_next {
56 my $self = shift;
57
58 $self->sort;
59
60 while (@{$self->{wait}}) {
61 my $size = $self->{wait}[0]{size};
62 my $min = 0;
63 for (@reserve) {
64 last if $size <= $_->[0];
65 $min += $_->[1];
66 }
67 last unless $self->{slots} > $min;
68 my $transfer = shift @{$self->{wait}};
69 $self->{lastspb} = $transfer->{spb};
70 $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001;
71 $self->{started}++;
72 $transfer->wake;
73 last;
74 }
75 }
76
77 sub force_wake_next {
78 my $self = shift;
79
80 $self->{slots} += $::MAX_TRANSFERS;
81 $self->wake_next;
82 $self->{slots} -= $::MAX_TRANSFERS;
83 }
84
85 sub waiters {
86 $_[0]->sort;
87 @{$_[0]{wait}};
88 }
89
90 sub DESTROY {
91 my $self = shift;
92
93 $self->{reschedule}->cancel;
94 }
95
96 package transfer;
97
98 use Coro::Timer ();
99
100 sub wake {
101 my $self = shift;
102
103 $self->{alloc} = 1;
104 $self->{queue}{slots}--;
105 $self->{wake} and $self->{wake}->ready;
106 }
107
108 sub try {
109 my $self = shift;
110
111 $self->{alloc} || do {
112 my $timeout = Coro::Timer::timeout $_[0];
113 local $self->{wake} = $self->{coro};
114
115 Coro::schedule;
116
117 $self->{alloc};
118 }
119 }
120
121 sub DESTROY {
122 my $self = shift;
123
124 if ($self->{alloc}) {
125 $self->{queue}{slots}++;
126 $self->{queue}->wake_next;
127 }
128 }
129
130 1;
131