--- IO-AIO/AIO.xs 2006/10/24 00:26:32 1.65 +++ IO-AIO/AIO.xs 2006/10/24 15:15:56 1.70 @@ -1,3 +1,6 @@ +/* solaris */ +#define _POSIX_PTHREAD_SEMANTICS 1 + #if __linux # define _GNU_SOURCE #endif @@ -75,7 +78,7 @@ REQ_READDIR, REQ_LINK, REQ_SYMLINK, REQ_GROUP, REQ_NOP, - REQ_SLEEP, + REQ_BUSY, }; #define AIO_REQ_KLASS "IO::AIO::REQ" @@ -119,6 +122,7 @@ DEFAULT_PRI = 0, PRI_BIAS = -PRI_MIN, + NUM_PRI = PRI_MAX + PRI_BIAS + 1, }; static int next_pri = DEFAULT_PRI + PRI_BIAS; @@ -138,8 +142,59 @@ static pthread_mutex_t reqlock = AIO_MUTEX_INIT; static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; -static volatile aio_req reqs, reqe; /* queue start, queue end */ -static volatile aio_req ress, rese; /* queue start, queue end */ +/* + * a somewhat faster data structure might be nice, but + * with 8 priorities this actually needs <20 insns + * per shift, the most expensive operation. + */ +typedef struct { + aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ + int size; +} reqq; + +static reqq req_queue; +static reqq res_queue; + +int reqq_push (reqq *q, aio_req req) +{ + int pri = req->pri; + req->next = 0; + + if (q->qe[pri]) + { + q->qe[pri]->next = req; + q->qe[pri] = req; + } + else + q->qe[pri] = q->qs[pri] = req; + + return q->size++; +} + +aio_req reqq_shift (reqq *q) +{ + int pri; + + if (!q->size) + return 0; + + --q->size; + + for (pri = NUM_PRI; pri--; ) + { + aio_req req = q->qs[pri]; + + if (req) + { + if (!(q->qs[pri] = req->next)) + q->qe[pri] = 0; + + return req; + } + } + + abort (); +} static void req_invoke (aio_req req); static void req_free (aio_req req); @@ -183,7 +238,7 @@ PUSHMARK (SP); XPUSHs (req_sv (grp, AIO_GRP_KLASS)); PUTBACK; - call_sv (grp->fh2, G_VOID | G_EVAL); + call_sv (grp->fh2, G_VOID | G_EVAL | G_KEEPERR); SPAGAIN; FREETMPS; LEAVE; @@ -220,16 +275,16 @@ while (nreqs) { - aio_req req; + int size; #if !(__i386 || __x86_64) /* safe without sempahore on this archs */ pthread_mutex_lock (&reslock); #endif - req = ress; + size = res_queue.size; #if !(__i386 || __x86_64) /* safe without sempahore on this archs */ pthread_mutex_unlock (&reslock); #endif - if (req) + if (size) return; FD_ZERO(&rfd); @@ -242,104 +297,92 @@ static void req_invoke (aio_req req) { dSP; - int errorno = errno; - - if (req->flags & FLAG_CANCELLED || !SvOK (req->callback)) - return; - errno = req->errorno; + if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback)) + { + errno = req->errorno; - ENTER; - SAVETMPS; - PUSHMARK (SP); - EXTEND (SP, 1); + ENTER; + SAVETMPS; + PUSHMARK (SP); + EXTEND (SP, 1); - switch (req->type) - { - case REQ_READDIR: + switch (req->type) { - SV *rv = &PL_sv_undef; - - if (req->result >= 0) + case REQ_READDIR: { - char *buf = req->data2ptr; - AV *av = newAV (); + SV *rv = &PL_sv_undef; - while (req->result) + if (req->result >= 0) { - SV *sv = newSVpv (buf, 0); + char *buf = req->data2ptr; + AV *av = newAV (); + + while (req->result) + { + SV *sv = newSVpv (buf, 0); + + av_push (av, sv); + buf += SvCUR (sv) + 1; + req->result--; + } - av_push (av, sv); - buf += SvCUR (sv) + 1; - req->result--; + rv = sv_2mortal (newRV_noinc ((SV *)av)); } - rv = sv_2mortal (newRV_noinc ((SV *)av)); + PUSHs (rv); } + break; - PUSHs (rv); - } - break; - - case REQ_OPEN: - { - /* convert fd to fh */ - SV *fh; - - PUSHs (sv_2mortal (newSViv (req->result))); - PUTBACK; - call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); - SPAGAIN; - - fh = SvREFCNT_inc (POPs); + case REQ_OPEN: + { + /* convert fd to fh */ + SV *fh; - PUSHMARK (SP); - XPUSHs (sv_2mortal (fh)); - } - break; + PUSHs (sv_2mortal (newSViv (req->result))); + PUTBACK; + call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); + SPAGAIN; - case REQ_GROUP: - req->fd = 2; /* mark group as finished */ + fh = SvREFCNT_inc (POPs); - if (req->data) - { - int i; - AV *av = (AV *)req->data; + PUSHMARK (SP); + XPUSHs (sv_2mortal (fh)); + } + break; - EXTEND (SP, AvFILL (av) + 1); - for (i = 0; i <= AvFILL (av); ++i) - PUSHs (*av_fetch (av, i, 0)); - } - break; + case REQ_GROUP: + req->fd = 2; /* mark group as finished */ - case REQ_NOP: - case REQ_SLEEP: - break; + if (req->data) + { + int i; + AV *av = (AV *)req->data; - default: - PUSHs (sv_2mortal (newSViv (req->result))); - break; - } + EXTEND (SP, AvFILL (av) + 1); + for (i = 0; i <= AvFILL (av); ++i) + PUSHs (*av_fetch (av, i, 0)); + } + break; + case REQ_NOP: + case REQ_BUSY: + break; - PUTBACK; - call_sv (req->callback, G_VOID | G_EVAL); - SPAGAIN; + default: + PUSHs (sv_2mortal (newSViv (req->result))); + break; + } - FREETMPS; - LEAVE; - errno = errorno; + PUTBACK; + call_sv (req->callback, G_VOID | G_EVAL); + SPAGAIN; - if (SvTRUE (ERRSV)) - { - req_free (req); - croak (0); + FREETMPS; + LEAVE; } -} -static void req_free (aio_req req) -{ if (req->grp) { aio_req grp = req->grp; @@ -354,6 +397,15 @@ aio_grp_dec (grp); } + if (SvTRUE (ERRSV)) + { + req_free (req); + croak (0); + } +} + +static void req_free (aio_req req) +{ if (req->self) { sv_unmagic (req->self, PERL_MAGIC_ext); @@ -395,20 +447,16 @@ for (;;) { pthread_mutex_lock (&reslock); - req = ress; + req = reqq_shift (&res_queue); if (req) { - ress = req->next; - - if (!ress) + if (!res_queue.size) { /* read any signals sent by the worker threads */ char buf [32]; while (read (respipe [0], buf, 32) == 32) ; - - rese = 0; } } @@ -481,17 +529,7 @@ ++nreqs; pthread_mutex_lock (&reqlock); - - req->next = 0; - - if (reqe) - { - reqe->next = req; - reqe = req; - } - else - reqe = reqs = req; - + reqq_push (&req_queue, req); pthread_cond_signal (&reqwait); pthread_mutex_unlock (&reqlock); @@ -510,8 +548,11 @@ static void end_thread (void) { aio_req req; + Newz (0, req, 1, aio_cb); + req->type = REQ_QUIT; + req->pri = PRI_MAX + PRI_BIAS; req_send (req); } @@ -831,13 +872,7 @@ for (;;) { - req = reqs; - - if (reqs) - { - reqs = reqs->next; - if (!reqs) reqe = 0; - } + req = reqq_shift (&req_queue); if (req) break; @@ -875,7 +910,7 @@ case REQ_FSYNC: req->result = fsync (req->fd); break; case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break; - case REQ_SLEEP: + case REQ_BUSY: { struct timeval tv; @@ -899,20 +934,9 @@ pthread_mutex_lock (&reslock); - req->next = 0; - - if (rese) - { - rese->next = req; - rese = req; - } - else - { - rese = ress = req; - - /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); - } + if (!reqq_push (&res_queue, req)) + /* write a dummy byte to the pipe so fh becomes ready */ + write (respipe [1], &respipe, 1); pthread_mutex_unlock (&reslock); } @@ -953,24 +977,12 @@ started = 0; - while (reqs) - { - prv = reqs; - reqs = prv->next; - req_free (prv); - } - - reqs = reqe = 0; - - while (ress) - { - prv = ress; - ress = prv->next; - req_free (prv); - } - - ress = rese = 0; + while (prv = reqq_shift (&req_queue)) + req_free (prv); + while (prv = reqq_shift (&res_queue)) + req_free (prv); + close (respipe [0]); close (respipe [1]); create_pipe (); @@ -1255,14 +1267,14 @@ } void -aio_sleep (delay,callback=&PL_sv_undef) +aio_busy (delay,callback=&PL_sv_undef) double delay SV * callback PPCODE: { dREQ; - req->type = REQ_SLEEP; + req->type = REQ_BUSY; req->fd = delay < 0. ? 0 : delay; req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd); @@ -1295,16 +1307,20 @@ REQ_SEND; } -#if 0 - void -aio_pri (int pri = DEFAULT_PRI) - CODE: - if (pri < PRI_MIN) pri = PRI_MIN; - if (pri > PRI_MAX) pri = PRI_MAX; - next_pri = pri + PRI_BIAS; +aioreq_pri (int pri = DEFAULT_PRI) + CODE: + if (pri < PRI_MIN) pri = PRI_MIN; + if (pri > PRI_MAX) pri = PRI_MAX; + next_pri = pri + PRI_BIAS; -#endif +void +aioreq_nice (int nice = 0) + CODE: + nice = next_pri - nice; + if (nice < PRI_MIN) nice = PRI_MIN; + if (nice > PRI_MAX) nice = PRI_MAX; + next_pri = nice + PRI_BIAS; void flush () @@ -1363,7 +1379,6 @@ void cancel (aio_req_ornot req) - PROTOTYPE: CODE: req_cancel (req); @@ -1423,7 +1438,7 @@ } void -feed_limit (aio_req grp, int limit) +limit (aio_req grp, int limit) CODE: grp->fd2 = limit; aio_grp_feed (grp);