--- IO-AIO/AIO.xs 2006/11/08 01:59:59 1.92 +++ IO-AIO/AIO.xs 2006/11/26 22:47:58 1.95 @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -162,6 +163,40 @@ + ((tv2->tv_usec - tv1->tv_usec) >> 10); } +static pthread_t main_tid; +static int main_sig; +static int block_sig_level; + +void block_sig () +{ + sigset_t ss; + + if (block_sig_level++) + return; + + if (!main_sig) + return; + + sigemptyset (&ss); + sigaddset (&ss, main_sig); + pthread_sigmask (SIG_BLOCK, &ss, 0); +} + +void unblock_sig () +{ + sigset_t ss; + + if (--block_sig_level) + return; + + if (!main_sig) + return; + + sigemptyset (&ss); + sigaddset (&ss, main_sig); + pthread_sigmask (SIG_UNBLOCK, &ss, 0); +} + static int next_pri = DEFAULT_PRI + PRI_BIAS; static unsigned int started, idle, wanted; @@ -322,7 +357,7 @@ } static int poll_cb (); -static void req_invoke (aio_req req); +static int req_invoke (aio_req req); static void req_free (aio_req req); static void req_cancel (aio_req req); @@ -352,6 +387,8 @@ static void aio_grp_feed (aio_req grp) { + block_sig (); + while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) { int old_len = grp->size; @@ -379,6 +416,8 @@ break; } } + + unblock_sig (); } static void aio_grp_dec (aio_req grp) @@ -391,12 +430,21 @@ /* finish, if done */ if (!grp->size && grp->int1) { - req_invoke (grp); + block_sig (); + + if (!req_invoke (grp)) + { + req_free (grp); + unblock_sig (); + croak (0); + } + req_free (grp); + unblock_sig (); } } -static void req_invoke (aio_req req) +static int req_invoke (aio_req req) { dSP; @@ -449,10 +497,9 @@ call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); SPAGAIN; - fh = SvREFCNT_inc (POPs); - + fh = POPs; PUSHMARK (SP); - XPUSHs (sv_2mortal (fh)); + XPUSHs (fh); } break; @@ -527,11 +574,7 @@ aio_grp_dec (grp); } - if (SvTRUE (ERRSV)) - { - req_free (req); - croak (0); - } + return !SvTRUE (ERRSV); } static void req_free (aio_req req) @@ -596,7 +639,7 @@ sigfillset (&fullsigset); LOCK (wrklock); - sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); + pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) { @@ -609,7 +652,7 @@ else free (wrk); - sigprocmask (SIG_SETMASK, &oldsigset, 0); + pthread_sigmask (SIG_SETMASK, &oldsigset, 0); UNLOCK (wrklock); } @@ -627,6 +670,8 @@ static void req_send (aio_req req) { + block_sig (); + ++nreqs; LOCK (reqlock); @@ -635,6 +680,8 @@ pthread_cond_signal (&reqwait); UNLOCK (reqlock); + unblock_sig (); + maybe_start_thread (); } @@ -714,6 +761,8 @@ if (max_poll_time) gettimeofday (&tv_start, 0); + block_sig (); + for (;;) { for (;;) @@ -730,8 +779,8 @@ if (!res_queue.size) { /* read any signals sent by the worker threads */ - char buf [32]; - while (read (respipe [0], buf, 32) == 32) + char buf [4]; + while (read (respipe [0], buf, 4) == 4) ; } } @@ -750,7 +799,12 @@ } else { - req_invoke (req); + if (!req_invoke (req)) + { + req_free (req); + unblock_sig (); + croak (0); + } count++; } @@ -777,6 +831,7 @@ ++maxreqs; } + unblock_sig (); return count; } @@ -1157,8 +1212,14 @@ ++npending; if (!reqq_push (&res_queue, req)) - /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + { + /* write a dummy byte to the pipe so fh becomes ready */ + write (respipe [1], &respipe, 1); + + /* optionally signal the main thread asynchronously */ + if (main_sig) + pthread_kill (main_tid, main_sig); + } self->req = 0; worker_clear (self); @@ -1271,6 +1332,7 @@ newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); + newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); create_pipe (); pthread_atfork (atfork_prepare, atfork_parent, atfork_child); @@ -1680,6 +1742,43 @@ CODE: poll_wait (); +void +setsig (int signum = SIGIO) + PROTOTYPE: ;$ + CODE: +{ + if (block_sig_level) + croak ("cannot call IO::AIO::setsig from within aio_block/callback"); + + LOCK (reslock); + main_tid = pthread_self (); + main_sig = signum; + UNLOCK (reslock); + + if (main_sig && npending) + pthread_kill (main_tid, main_sig); +} + +void +aio_block (SV *cb) + PROTOTYPE: & + PPCODE: +{ + int count; + + block_sig (); + PUSHMARK (SP); + PUTBACK; + count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL); + SPAGAIN; + unblock_sig (); + + if (SvTRUE (ERRSV)) + croak (0); + + XSRETURN (count); +} + int nreqs() PROTOTYPE: @@ -1738,6 +1837,9 @@ int i; aio_req req; + if (main_sig && !block_sig_level) + croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use"); + if (grp->int1 == 2) croak ("cannot add requests to IO::AIO::GRP after the group finished");