1 |
package EVQ; |
2 |
use strict; |
3 |
use AnyEvent; |
4 |
|
5 |
my $J; |
6 |
|
7 |
our %reqh; |
8 |
our $id = 0; |
9 |
my @req; |
10 |
|
11 |
sub schedule { |
12 |
my $reqcnt = scalar (keys %reqh); |
13 |
if ($reqcnt == 0 && !@req) { |
14 |
warn "no more jobs, finishing...\n"; |
15 |
$J->broadcast; |
16 |
} |
17 |
while ($reqcnt < 200) { |
18 |
my $r = pop @req; |
19 |
return unless defined $r; |
20 |
eval { |
21 |
$r->[0]->(addreq ($r->[1])); |
22 |
}; |
23 |
if ($@) { warn "EXCEPTION: $@\n" } |
24 |
$reqcnt = scalar (keys %reqh); |
25 |
} |
26 |
} |
27 |
|
28 |
sub addreq { my $k = $id . "_" . $_[0]; $reqh{$k} = 1; $id++; $k } |
29 |
sub finreq { delete $reqh{$_[0]}; } |
30 |
|
31 |
sub push_request { |
32 |
my ($s, $cb) = @_; |
33 |
push @req, [$cb, $s]; |
34 |
schedule; |
35 |
} |
36 |
|
37 |
our $t; |
38 |
sub timer { |
39 |
$t = AnyEvent->timer (after => 1, cb => sub { |
40 |
schedule; |
41 |
my $reqcnt = scalar (keys %reqh); |
42 |
$reqcnt += @req; |
43 |
my $rreqcnt = scalar (keys %reqh); |
44 |
warn "$reqcnt outstanding requests [$rreqcnt in progress]\n"; |
45 |
timer (); |
46 |
}); |
47 |
} |
48 |
|
49 |
sub start { |
50 |
$J = AnyEvent->condvar; |
51 |
timer; |
52 |
} |
53 |
sub wait { |
54 |
$J->wait; |
55 |
} |
56 |
|
57 |
1 |