ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/myhttpd/queue.pl
Revision: 1.7
Committed: Sat Aug 13 23:42:05 2011 UTC (12 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-6_5, rel-6_10, rel-6_09, rel-6_08, rel-6_07, rel-6_512, rel-6_513, rel-6_511, rel-6_514, rel-6_32, rel-6_33, rel-6_31, rel-6_36, rel-6_37, rel-6_38, rel-6_39, rel-6_23, rel-6_29, rel-6_28, rel-6_46, rel-6_45, rel-6_51, rel-6_52, rel-6_53, rel-6_54, rel-6_55, rel-6_56, rel-6_57, rel-6_43, rel-6_42, rel-6_41, rel-6_47, rel-6_44, rel-6_49, rel-6_48, HEAD
Changes since 1.6: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 package transferqueue;
2    
3     use Scalar::Util;
4    
5     my @reserve = (
6     [ 1_200_000, 1],
7     [ 8_000_000, 1],
8     [ 75_000_000, 1],
9     );
10    
11     sub new {
12     my $class = shift;
13     my $self = bless {
14     slots => $_[0],
15     lastspb => 0,
16     avgspb => 0,
17     }, $class;
18 root 1.7 $self->{reschedule} = AE::timer 10, 3, sub { $self->wake_next };
19 root 1.1 $self;
20     }
21    
22     sub start_transfer {
23     my $self = shift;
24     my $size = $_[0];
25    
26     my $transfer = bless {
27     queue => $self,
28     time => $::NOW,
29     size => $size,
30     coro => $Coro::current,
31     started => 0,
32     }, transfer::;
33    
34     push @{$self->{wait}}, $transfer;
35    
36     $self->wake_next;
37    
38     $transfer;
39     }
40    
41     sub sort {
42     my @queue = grep $_, @{$_[0]{wait}};
43    
44     $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue;
45    
46     $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue];
47    
48     Scalar::Util::weaken $_ for @{$_[0]{wait}};
49     }
50    
51     sub wake_next {
52     my $self = shift;
53    
54     $self->sort;
55    
56     while (@{$self->{wait}}) {
57     my $size = $self->{wait}[0]{size};
58     my $min = 0;
59     for (@reserve) {
60     last if $size <= $_->[0];
61     $min += $_->[1];
62     }
63     last unless $self->{slots} > $min;
64     my $transfer = shift @{$self->{wait}};
65     $self->{lastspb} = $transfer->{spb};
66 root 1.4 $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001;
67 root 1.1 $self->{started}++;
68     $transfer->wake;
69     last;
70     }
71     }
72    
73 root 1.2 sub force_wake_next {
74     my $self = shift;
75    
76 root 1.6 return unless @{ $self->{wait} };
77    
78 root 1.3 $self->{slots} += $::MAX_TRANSFERS;
79 root 1.2 $self->wake_next;
80 root 1.3 $self->{slots} -= $::MAX_TRANSFERS;
81 root 1.6
82     1
83 root 1.2 }
84    
85 root 1.1 sub waiters {
86     $_[0]->sort;
87     @{$_[0]{wait}};
88     }
89    
90     package transfer;
91    
92     use Coro::Timer ();
93    
94     sub wake {
95     my $self = shift;
96    
97     $self->{alloc} = 1;
98     $self->{queue}{slots}--;
99     $self->{wake} and $self->{wake}->ready;
100     }
101    
102     sub try {
103     my $self = shift;
104    
105     $self->{alloc} || do {
106     my $timeout = Coro::Timer::timeout $_[0];
107     local $self->{wake} = $self->{coro};
108    
109     Coro::schedule;
110    
111     $self->{alloc};
112     }
113     }
114    
115     sub DESTROY {
116     my $self = shift;
117    
118     if ($self->{alloc}) {
119     $self->{queue}{slots}++;
120     $self->{queue}->wake_next;
121     }
122     }
123    
124     1;
125