--- IO-AIO/AIO.xs 2005/07/13 00:13:09 1.10 +++ IO-AIO/AIO.xs 2005/07/20 21:55:27 1.11 @@ -1,9 +1,9 @@ +#define _XOPEN_SOURCE 500 + #include "EXTERN.h" #include "perl.h" #include "XSUB.h" -#define _XOPEN_SOURCE 500 - #include #include @@ -56,7 +56,7 @@ typedef aio_cb *aio_req; static int started; -static int nreqs; +static volatile int nreqs; static int max_outstanding = 1<<30; static int respipe [2]; @@ -70,14 +70,14 @@ static void poll_wait () { - if (!nreqs) - return; - - fd_set rfd; - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + if (nreqs && !ress) + { + fd_set rfd; + FD_ZERO(&rfd); + FD_SET(respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + select (respipe [0] + 1, &rfd, 0, 0, 0); + } } static int @@ -85,33 +85,29 @@ { dSP; int count = 0; - aio_req req; - + aio_req req, prv; + + static int rl;//D + //printf ("%d ENTER\n", ++rl);//D + + pthread_mutex_lock (&reslock); + { - /* read and signals sent by the worker threads */ + /* read any signals sent by the worker threads */ char buf [32]; while (read (respipe [0], buf, 32) > 0) ; } - for (;;) - { - pthread_mutex_lock (&reslock); - - req = ress; + req = ress; + ress = rese = 0; - if (ress) - { - ress = ress->next; - if (!ress) rese = 0; - } - - pthread_mutex_unlock (&reslock); - - if (!req) - break; + pthread_mutex_unlock (&reslock); + while (req) + { nreqs--; + //printf ("%d count %d %p->%p\n", rl, count, req, req->next);//D if (req->type == REQ_QUIT) started--; @@ -168,9 +164,14 @@ count++; } - Safefree (req); + prv = req; + req = req->next; + Safefree (prv); + + /* TODO: croak on errors? */ } + //printf ("%d LEAVE %p %p\n", rl--, ress, rese);//D return count; }