--- IO-AIO/AIO.xs 2006/10/24 16:35:04 1.71 +++ IO-AIO/AIO.xs 2006/10/26 06:44:48 1.75 @@ -49,6 +49,11 @@ # define NAME_MAX 4096 #endif +#ifndef PTHREAD_STACK_MIN +/* care for broken platforms, e.g. windows */ +# define PTHREAD_STACK_MIN 16384 +#endif + #if __ia64 # define STACKSIZE 65536 #elif __i386 || __x86_64 /* 16k is unreasonably high :( */ @@ -130,7 +135,6 @@ static int started, wanted; static volatile int nreqs; -static int max_outstanding = 1<<30; static int respipe [2]; #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) @@ -240,8 +244,10 @@ abort (); } +static int poll_cb (); static void req_invoke (aio_req req); static void req_free (aio_req req); +static void req_cancel (aio_req req); /* must be called at most once */ static SV *req_sv (aio_req req, const char *klass) @@ -470,17 +476,25 @@ Safefree (req); } +static void req_cancel_subs (aio_req grp) +{ + aio_req sub; + + if (grp->type != REQ_GROUP) + return; + + SvREFCNT_dec (grp->fh2); + grp->fh2 = 0; + + for (sub = grp->grp_first; sub; sub = sub->grp_next) + req_cancel (sub); +} + static void req_cancel (aio_req req) { req->flags |= FLAG_CANCELLED; - if (req->type == REQ_GROUP) - { - aio_req sub; - - for (sub = req->grp_first; sub; sub = sub->grp_next) - req_cancel (sub); - } + req_cancel_subs (req); } static int poll_cb () @@ -550,14 +564,14 @@ static void start_thread (void) { + sigset_t fullsigset, oldsigset; + pthread_attr_t attr; + worker *wrk = calloc (1, sizeof (worker)); if (!wrk) croak ("unable to allocate worker thread data"); - sigset_t fullsigset, oldsigset; - pthread_attr_t attr; - pthread_attr_init (&attr); pthread_attr_setstacksize (&attr, STACKSIZE); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); @@ -593,17 +607,6 @@ reqq_push (&req_queue, req); pthread_cond_signal (&reqwait); UNLOCK (reqlock); - - if (nreqs > max_outstanding) - for (;;) - { - poll_cb (); - - if (nreqs <= max_outstanding) - break; - - poll_wait (); - } } static void end_thread (void) @@ -706,9 +709,9 @@ #endif #if !HAVE_READAHEAD -# define readahead aio_readahead +# define readahead(fd,offset,count) aio_readahead (fd, offset, count, self) -static ssize_t readahead (int fd, off_t offset, size_t count) +static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self) { dBUF; @@ -723,6 +726,7 @@ errno = 0; } + #endif #if !HAVE_READDIR_R @@ -1109,14 +1113,6 @@ int nthreads PROTOTYPE: $ -int -max_outstanding (nreqs) - int nreqs - PROTOTYPE: $ - CODE: - RETVAL = max_outstanding; - max_outstanding = nreqs; - void aio_open (pathname,flags,mode,callback=&PL_sv_undef) SV * pathname @@ -1497,6 +1493,11 @@ } void +cancel_subs (aio_req_ornot req) + CODE: + req_cancel_subs (req); + +void result (aio_req grp, ...) CODE: {