--- IO-AIO/AIO.xs 2006/10/24 17:22:17 1.72 +++ IO-AIO/AIO.xs 2006/10/26 13:25:40 1.77 @@ -1,10 +1,11 @@ /* solaris */ #define _POSIX_PTHREAD_SEMANTICS 1 -#if __linux +#if __linux && !defined(_GNU_SOURCE) # define _GNU_SOURCE #endif +/* just in case */ #define _REENTRANT 1 #include @@ -49,6 +50,11 @@ # define NAME_MAX 4096 #endif +#ifndef PTHREAD_STACK_MIN +/* care for broken platforms, e.g. windows */ +# define PTHREAD_STACK_MIN 16384 +#endif + #if __ia64 # define STACKSIZE 65536 #elif __i386 || __x86_64 /* 16k is unreasonably high :( */ @@ -130,7 +136,6 @@ static int started, wanted; static volatile int nreqs; -static int max_outstanding = 1<<30; static int respipe [2]; #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) @@ -142,7 +147,7 @@ #define LOCK(mutex) pthread_mutex_lock (&(mutex)) #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex)) -/* worker threasd management */ +/* worker threads management */ static pthread_mutex_t wrklock = AIO_MUTEX_INIT; typedef struct worker { @@ -240,6 +245,7 @@ abort (); } +static int poll_cb (int max); static void req_invoke (aio_req req); static void req_free (aio_req req); static void req_cancel (aio_req req); @@ -492,14 +498,14 @@ req_cancel_subs (req); } -static int poll_cb () +static int poll_cb (int max) { dSP; int count = 0; int do_croak = 0; aio_req req; - for (;;) + while (max <= 0 || count < max) { LOCK (reslock); req = reqq_shift (&res_queue); @@ -559,14 +565,14 @@ static void start_thread (void) { + sigset_t fullsigset, oldsigset; + pthread_attr_t attr; + worker *wrk = calloc (1, sizeof (worker)); if (!wrk) croak ("unable to allocate worker thread data"); - sigset_t fullsigset, oldsigset; - pthread_attr_t attr; - pthread_attr_init (&attr); pthread_attr_setstacksize (&attr, STACKSIZE); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); @@ -602,17 +608,6 @@ reqq_push (&req_queue, req); pthread_cond_signal (&reqwait); UNLOCK (reqlock); - - if (nreqs > max_outstanding) - for (;;) - { - poll_cb (); - - if (nreqs <= max_outstanding) - break; - - poll_wait (); - } } static void end_thread (void) @@ -649,7 +644,7 @@ while (started > wanted) { poll_wait (); - poll_cb (); + poll_cb (0); } } @@ -715,9 +710,9 @@ #endif #if !HAVE_READAHEAD -# define readahead aio_readahead +# define readahead(fd,offset,count) aio_readahead (fd, offset, count, self) -static ssize_t readahead (int fd, off_t offset, size_t count) +static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self) { dBUF; @@ -732,6 +727,7 @@ errno = 0; } + #endif #if !HAVE_READDIR_R @@ -786,7 +782,7 @@ res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); if (res < 0 && sbytes) - /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */ + /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */ res = sbytes; } @@ -876,9 +872,8 @@ LOCK (wrklock); self->dirp = dirp = opendir (req->dataptr); self->dbuf = u = malloc (sizeof (*u)); - UNLOCK (wrklock); - req->data2ptr = names = malloc (memlen); + UNLOCK (wrklock); if (dirp && u && names) for (;;) @@ -1118,14 +1113,6 @@ int nthreads PROTOTYPE: $ -int -max_outstanding (nreqs) - int nreqs - PROTOTYPE: $ - CODE: - RETVAL = max_outstanding; - max_outstanding = nreqs; - void aio_open (pathname,flags,mode,callback=&PL_sv_undef) SV * pathname @@ -1411,7 +1398,7 @@ while (nreqs) { poll_wait (); - poll_cb (); + poll_cb (0); } void @@ -1421,7 +1408,7 @@ if (nreqs) { poll_wait (); - poll_cb (); + poll_cb (0); } int @@ -1436,7 +1423,15 @@ poll_cb(...) PROTOTYPE: CODE: - RETVAL = poll_cb (); + RETVAL = poll_cb (0); + OUTPUT: + RETVAL + +int +poll_some(int max = 0) + PROTOTYPE: $ + CODE: + RETVAL = poll_cb (max); OUTPUT: RETVAL