--- IO-AIO/AIO.xs 2005/08/17 06:12:10 1.29 +++ IO-AIO/AIO.xs 2005/08/18 16:32:10 1.30 @@ -57,7 +57,7 @@ typedef aio_cb *aio_req; -static int started; +static int started, wanted; static volatile int nreqs; static int max_outstanding = 1<<30; static int respipe [2]; @@ -118,12 +118,12 @@ if (!ress) { - rese = 0; - /* read any signals sent by the worker threads */ char buf [32]; while (read (respipe [0], buf, 32) == 32) ; + + rese = 0; } } @@ -209,7 +209,7 @@ pthread_attr_init (&attr); pthread_attr_setstacksize (&attr, STACKSIZE); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); +// pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); sigfillset (&fullsigset); sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); @@ -223,6 +223,9 @@ static void send_req (aio_req req) { + while (started < wanted && nreqs >= started) + start_thread (); + nreqs++; pthread_mutex_lock (&reqlock); @@ -264,21 +267,24 @@ static void min_parallel (int nthreads) { - while (nthreads > started) - start_thread (); + if (wanted < nthreads) + wanted = nthreads; } static void max_parallel (int nthreads) { int cur = started; - while (cur > nthreads) - { + if (wanted > nthreads) + wanted = nthreads; + + while (cur > wanted) + { end_thread (); cur--; } - while (started > nthreads) + while (started > wanted) { poll_wait (); poll_cb (); @@ -313,7 +319,6 @@ { aio_req prv; - int restart = started; started = 0; while (reqs) @@ -339,8 +344,6 @@ create_pipe (); atfork_parent (); - - min_parallel (restart); } /*****************************************************************************/