ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/myhttpd/queue.pl
Revision: 1.4
Committed: Sun Dec 4 19:36:28 2005 UTC (18 years, 7 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-2_5, rel-4_22, rel-4_21, rel-4_0, rel-4_3, rel-3_41, rel-4_13, rel-4_11, rel-3_55, rel-3_51, rel-4_01, rel-4_03, rel-4_02, rel-2_0, rel-2_1, rel-1_9, rel-3_6, rel-3_62, rel-3_63, rel-3_61, rel-1_7, rel-1_6, rel-3_4, rel-3_1, rel-3_5, rel-3_3, rel-3_2, rel-3_0, rel-3_01, rel-3_11, rel-4_1, rel-4_2, stack_sharing, rel-3_501, rel-4_31
Changes since 1.3: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 package transferqueue;
2    
3     use Scalar::Util;
4    
5     my @reserve = (
6     [ 1_200_000, 1],
7     [ 8_000_000, 1],
8     [ 75_000_000, 1],
9     );
10    
11     sub new {
12     my $class = shift;
13     my $self = bless {
14     slots => $_[0],
15     lastspb => 0,
16     avgspb => 0,
17     }, $class;
18     $self->{reschedule} = Event->timer(
19     after => 10,
20     interval => 3,
21     cb => sub { $self->wake_next },
22     );
23     $self;
24     }
25    
26     sub start_transfer {
27     my $self = shift;
28     my $size = $_[0];
29    
30     my $transfer = bless {
31     queue => $self,
32     time => $::NOW,
33     size => $size,
34     coro => $Coro::current,
35     started => 0,
36     }, transfer::;
37    
38     push @{$self->{wait}}, $transfer;
39    
40     $self->wake_next;
41    
42     $transfer;
43     }
44    
45     sub sort {
46     my @queue = grep $_, @{$_[0]{wait}};
47    
48     $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue;
49    
50     $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue];
51    
52     Scalar::Util::weaken $_ for @{$_[0]{wait}};
53     }
54    
55     sub wake_next {
56     my $self = shift;
57    
58     $self->sort;
59    
60     while (@{$self->{wait}}) {
61     my $size = $self->{wait}[0]{size};
62     my $min = 0;
63     for (@reserve) {
64     last if $size <= $_->[0];
65     $min += $_->[1];
66     }
67     last unless $self->{slots} > $min;
68     my $transfer = shift @{$self->{wait}};
69     $self->{lastspb} = $transfer->{spb};
70 root 1.4 $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001;
71 root 1.1 $self->{started}++;
72     $transfer->wake;
73     last;
74     }
75     }
76    
77 root 1.2 sub force_wake_next {
78     my $self = shift;
79    
80 root 1.3 $self->{slots} += $::MAX_TRANSFERS;
81 root 1.2 $self->wake_next;
82 root 1.3 $self->{slots} -= $::MAX_TRANSFERS;
83 root 1.2 }
84    
85 root 1.1 sub waiters {
86     $_[0]->sort;
87     @{$_[0]{wait}};
88     }
89    
90     sub DESTROY {
91     my $self = shift;
92    
93     $self->{reschedule}->cancel;
94     }
95    
96     package transfer;
97    
98     use Coro::Timer ();
99    
100     sub wake {
101     my $self = shift;
102    
103     $self->{alloc} = 1;
104     $self->{queue}{slots}--;
105     $self->{wake} and $self->{wake}->ready;
106     }
107    
108     sub try {
109     my $self = shift;
110    
111     $self->{alloc} || do {
112     my $timeout = Coro::Timer::timeout $_[0];
113     local $self->{wake} = $self->{coro};
114    
115     Coro::schedule;
116    
117     $self->{alloc};
118     }
119     }
120    
121     sub DESTROY {
122     my $self = shift;
123    
124     if ($self->{alloc}) {
125     $self->{queue}{slots}++;
126     $self->{queue}->wake_next;
127     }
128     }
129    
130     1;
131