| 1 |
package EVQ; |
| 2 |
use strict; |
| 3 |
use AnyEvent; |
| 4 |
|
| 5 |
my $J; |
| 6 |
|
| 7 |
our %reqh; |
| 8 |
our $id = 0; |
| 9 |
my @req; |
| 10 |
|
| 11 |
sub schedule { |
| 12 |
my $reqcnt = scalar (keys %reqh); |
| 13 |
if ($reqcnt == 0 && !@req) { |
| 14 |
warn "no more jobs, finishing...\n"; |
| 15 |
$J->broadcast; |
| 16 |
} |
| 17 |
while ($reqcnt < 200) { |
| 18 |
my $r = pop @req; |
| 19 |
return unless defined $r; |
| 20 |
eval { |
| 21 |
$r->[0]->(addreq ($r->[1])); |
| 22 |
}; |
| 23 |
if ($@) { warn "EXCEPTION: $@\n" } |
| 24 |
$reqcnt = scalar (keys %reqh); |
| 25 |
} |
| 26 |
} |
| 27 |
|
| 28 |
sub addreq { my $k = $id . "_" . $_[0]; $reqh{$k} = 1; $id++; $k } |
| 29 |
sub finreq { delete $reqh{$_[0]}; } |
| 30 |
|
| 31 |
sub push_request { |
| 32 |
my ($s, $cb) = @_; |
| 33 |
push @req, [$cb, $s]; |
| 34 |
schedule; |
| 35 |
} |
| 36 |
|
| 37 |
our $t; |
| 38 |
sub timer { |
| 39 |
$t = AnyEvent->timer (after => 1, cb => sub { |
| 40 |
schedule; |
| 41 |
my $reqcnt = scalar (keys %reqh); |
| 42 |
$reqcnt += @req; |
| 43 |
my $rreqcnt = scalar (keys %reqh); |
| 44 |
warn "$reqcnt outstanding requests [$rreqcnt in progress]\n"; |
| 45 |
timer (); |
| 46 |
}); |
| 47 |
} |
| 48 |
|
| 49 |
sub start { |
| 50 |
$J = AnyEvent->condvar; |
| 51 |
timer; |
| 52 |
} |
| 53 |
sub wait { |
| 54 |
$J->wait; |
| 55 |
} |
| 56 |
|
| 57 |
1 |