1 | package tbf; |
1 | package tbf; |
2 | |
2 | |
3 | # kind of token-bucket-filter |
3 | # kind of token-bucket-filter |
4 | |
4 | |
5 | my $max_per_client = 1e5; |
5 | my $max_per_client = $::TBF_MAX_PER_CLIENT || 24000; |
6 | |
6 | |
7 | sub new { |
7 | sub new { |
8 | my $class = shift; |
8 | my $class = shift; |
9 | my %arg = @_; |
9 | my %arg = @_; |
10 | my $self = bless \%arg, $class; |
10 | my $self = bless \%arg, $class; |
… | |
… | |
12 | $self->{maxbucket} ||= $self->{rate} * 3; # max 3s bucket |
12 | $self->{maxbucket} ||= $self->{rate} * 3; # max 3s bucket |
13 | $self->{minbucket} ||= $self->{rate}; # minimum bucket to share |
13 | $self->{minbucket} ||= $self->{rate}; # minimum bucket to share |
14 | $self->{interval} ||= $::BUFSIZE / $max_per_client; # good default interval |
14 | $self->{interval} ||= $::BUFSIZE / $max_per_client; # good default interval |
15 | |
15 | |
16 | if ($self->{rate}) { |
16 | if ($self->{rate}) { |
17 | $self->{w} = Event->timer(hard => 1, after => 0, interval => $self->{interval}, repeat => 1, cb => sub { |
17 | $self->{w} = EV::periodic 0, $self->{interval}, undef, sub { |
18 | $self->inject($self->{rate} * $self->{interval}); |
18 | $self->inject ($self->{rate} * $self->{interval}); |
19 | }); |
19 | }; |
20 | } else { |
20 | } else { |
21 | die "chaining not yet implemented\n"; |
21 | die "chaining not yet implemented\n"; |
22 | } |
22 | } |
23 | |
23 | |
24 | $self; |
24 | $self; |
25 | } |
|
|
26 | |
|
|
27 | sub DESTROY { |
|
|
28 | my $self = shift; |
|
|
29 | |
|
|
30 | $self->{w}->cancel; |
|
|
31 | } |
25 | } |
32 | |
26 | |
33 | sub inject { |
27 | sub inject { |
34 | my ($self, $bytes) = @_; |
28 | my ($self, $bytes) = @_; |
35 | |
29 | |
… | |
… | |
47 | $self->{bucket} += $v->[1] - $v->[2]; |
41 | $self->{bucket} += $v->[1] - $v->[2]; |
48 | $v->[3]->(); |
42 | $v->[3]->(); |
49 | } |
43 | } |
50 | } |
44 | } |
51 | |
45 | |
52 | } else { |
|
|
53 | if ($self->{maxbucket} < $self->{bucket}) { |
|
|
54 | ::slog (9, "unused bandwith: ".($self->{bucket} - $self->{maxbucket}));#d# |
|
|
55 | $self->{bucket} = $self->{maxbucket}; |
|
|
56 | } |
|
|
57 | } |
46 | } |
|
|
47 | last; |
|
|
48 | } |
58 | |
49 | |
59 | last; |
50 | if ($self->{maxbucket} < $self->{bucket}) { |
|
|
51 | ::unused_bandwidth ($self->{bucket} - $self->{maxbucket}); |
|
|
52 | $self->{bucket} = $self->{maxbucket}; |
60 | } |
53 | } |
61 | } |
54 | } |
62 | |
55 | |
63 | my $_tbf_id; |
56 | my $_tbf_id; |
64 | |
57 | |
65 | sub request { |
58 | sub request { |
66 | my ($self, $bytes, $weight) = @_; |
59 | my ($self, $bytes, $weight) = @_; |
67 | |
60 | |
68 | $weight ||= 1; |
61 | $weight ||= 1; |
69 | |
62 | |
70 | if ($self->{waitw} || $self->{bucket} < $bytes || 1) { |
|
|
71 | my $coro = $Coro::current; |
63 | my $coro = $Coro::current; |
72 | my $id = $_tbf_id++; |
64 | my $id = $_tbf_id++; |
73 | |
65 | |
74 | $self->{waitw} += $weight; |
66 | $self->{waitw} += $weight; |
75 | $self->{waitq}{$id} = [$weight, 0, $bytes, sub { |
67 | $self->{waitq}{$id} = [$weight, 0, $bytes, sub { |
76 | delete $self->{waitq}{$id}; |
68 | delete $self->{waitq}{$id}; |
77 | $self->{waitw} -= $weight; |
69 | $self->{waitw} -= $weight; |
78 | $coro->ready; |
70 | $coro->ready; |
79 | }]; |
71 | }]; |
80 | |
72 | |
81 | Coro::schedule; |
73 | Coro::schedule; |
82 | } else { |
|
|
83 | $self->{bucket} -= $bytes; |
|
|
84 | } |
|
|
85 | } |
74 | } |
86 | |
75 | |
87 | 1; |
76 | 1; |