ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/myhttpd/queue.pl
Revision: 1.7
Committed: Sat Aug 13 23:42:05 2011 UTC (12 years, 9 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-6_5, rel-6_10, rel-6_09, rel-6_08, rel-6_07, rel-6_512, rel-6_513, rel-6_511, rel-6_514, rel-6_32, rel-6_33, rel-6_31, rel-6_36, rel-6_37, rel-6_38, rel-6_39, rel-6_23, rel-6_29, rel-6_28, rel-6_46, rel-6_45, rel-6_51, rel-6_52, rel-6_53, rel-6_54, rel-6_55, rel-6_56, rel-6_57, rel-6_43, rel-6_42, rel-6_41, rel-6_47, rel-6_44, rel-6_49, rel-6_48, HEAD
Changes since 1.6: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 package transferqueue;
2
3 use Scalar::Util;
4
5 my @reserve = (
6 [ 1_200_000, 1],
7 [ 8_000_000, 1],
8 [ 75_000_000, 1],
9 );
10
11 sub new {
12 my $class = shift;
13 my $self = bless {
14 slots => $_[0],
15 lastspb => 0,
16 avgspb => 0,
17 }, $class;
18 $self->{reschedule} = AE::timer 10, 3, sub { $self->wake_next };
19 $self;
20 }
21
22 sub start_transfer {
23 my $self = shift;
24 my $size = $_[0];
25
26 my $transfer = bless {
27 queue => $self,
28 time => $::NOW,
29 size => $size,
30 coro => $Coro::current,
31 started => 0,
32 }, transfer::;
33
34 push @{$self->{wait}}, $transfer;
35
36 $self->wake_next;
37
38 $transfer;
39 }
40
41 sub sort {
42 my @queue = grep $_, @{$_[0]{wait}};
43
44 $_->{spb} = ($::NOW-$_->{time}) / ($_->{size} || 1) for @queue;
45
46 $_[0]{wait} = [sort { $b->{spb} <=> $a->{spb} } @queue];
47
48 Scalar::Util::weaken $_ for @{$_[0]{wait}};
49 }
50
51 sub wake_next {
52 my $self = shift;
53
54 $self->sort;
55
56 while (@{$self->{wait}}) {
57 my $size = $self->{wait}[0]{size};
58 my $min = 0;
59 for (@reserve) {
60 last if $size <= $_->[0];
61 $min += $_->[1];
62 }
63 last unless $self->{slots} > $min;
64 my $transfer = shift @{$self->{wait}};
65 $self->{lastspb} = $transfer->{spb};
66 $self->{avgspb} = $self->{avgspb} * 0.999 + $transfer->{spb} * 0.001;
67 $self->{started}++;
68 $transfer->wake;
69 last;
70 }
71 }
72
73 sub force_wake_next {
74 my $self = shift;
75
76 return unless @{ $self->{wait} };
77
78 $self->{slots} += $::MAX_TRANSFERS;
79 $self->wake_next;
80 $self->{slots} -= $::MAX_TRANSFERS;
81
82 1
83 }
84
85 sub waiters {
86 $_[0]->sort;
87 @{$_[0]{wait}};
88 }
89
90 package transfer;
91
92 use Coro::Timer ();
93
94 sub wake {
95 my $self = shift;
96
97 $self->{alloc} = 1;
98 $self->{queue}{slots}--;
99 $self->{wake} and $self->{wake}->ready;
100 }
101
102 sub try {
103 my $self = shift;
104
105 $self->{alloc} || do {
106 my $timeout = Coro::Timer::timeout $_[0];
107 local $self->{wake} = $self->{coro};
108
109 Coro::schedule;
110
111 $self->{alloc};
112 }
113 }
114
115 sub DESTROY {
116 my $self = shift;
117
118 if ($self->{alloc}) {
119 $self->{queue}{slots}++;
120 $self->{queue}->wake_next;
121 }
122 }
123
124 1;
125