--- IO-AIO/AIO.xs 2005/08/17 04:47:02 1.27 +++ IO-AIO/AIO.xs 2005/08/18 16:32:10 1.30 @@ -57,7 +57,7 @@ typedef aio_cb *aio_req; -static int started; +static int started, wanted; static volatile int nreqs; static int max_outstanding = 1<<30; static int respipe [2]; @@ -118,12 +118,12 @@ if (!ress) { - rese = 0; - /* read any signals sent by the worker threads */ char buf [32]; while (read (respipe [0], buf, 32) == 32) ; + + rese = 0; } } @@ -144,6 +144,9 @@ if (req->type == REQ_READ) SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0)); + if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE)) + SvREADONLY_off (req->data); + if (req->statdata) { PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; @@ -206,7 +209,7 @@ pthread_attr_init (&attr); pthread_attr_setstacksize (&attr, STACKSIZE); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); +// pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); sigfillset (&fullsigset); sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); @@ -220,6 +223,9 @@ static void send_req (aio_req req) { + while (started < wanted && nreqs >= started) + start_thread (); + nreqs++; pthread_mutex_lock (&reqlock); @@ -261,21 +267,24 @@ static void min_parallel (int nthreads) { - while (nthreads > started) - start_thread (); + if (wanted < nthreads) + wanted = nthreads; } static void max_parallel (int nthreads) { int cur = started; - while (cur > nthreads) - { + if (wanted > nthreads) + wanted = nthreads; + + while (cur > wanted) + { end_thread (); cur--; } - while (started > nthreads) + while (started > wanted) { poll_wait (); poll_cb (); @@ -296,29 +305,8 @@ static void atfork_prepare (void) { - for (;;) { - - for (;;) - { - poll_cb (); - - if (!nreqs) - break; - - poll_wait (); - } - - pthread_mutex_lock (&reqlock); - - if (!nreqs) - break; - - pthread_mutex_unlock (&reqlock); - } - + pthread_mutex_lock (&reqlock); pthread_mutex_lock (&reslock); - - assert (!nreqs && !reqs && !ress); } static void atfork_parent (void) @@ -329,12 +317,33 @@ static void atfork_child (void) { - int restart = started; + aio_req prv; + started = 0; - atfork_parent (); + while (reqs) + { + prv = reqs; + reqs = prv->next; + free_req (prv); + } + + reqs = reqe = 0; + + while (ress) + { + prv = ress; + ress = prv->next; + free_req (prv); + } + + ress = rese = 0; + + close (respipe [0]); + close (respipe [1]); + create_pipe (); - min_parallel (restart); + atfork_parent (); } /*****************************************************************************/ @@ -631,6 +640,12 @@ req->data = SvREFCNT_inc (data); req->dataptr = (char *)svptr + dataoffset; + if (!SvREADONLY (data)) + { + SvREADONLY_on (data); + req->data2ptr = (void *)data; + } + send_req (req); } }