--- IO-AIO/AIO.xs 2006/11/08 02:01:02 1.93 +++ IO-AIO/AIO.xs 2006/11/26 18:28:37 1.94 @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -162,6 +163,40 @@ + ((tv2->tv_usec - tv1->tv_usec) >> 10); } +static pthread_t main_tid; +static int main_sig; +static int block_sig_level; + +void block_sig () +{ + sigset_t ss; + + if (block_sig_level++) + return; + + if (!main_sig) + return; + + sigemptyset (&ss); + sigaddset (&ss, main_sig); + pthread_sigmask (SIG_BLOCK, &ss, 0); +} + +void unblock_sig () +{ + sigset_t ss; + + if (--block_sig_level) + return; + + if (!main_sig) + return; + + sigemptyset (&ss); + sigaddset (&ss, main_sig); + pthread_sigmask (SIG_UNBLOCK, &ss, 0); +} + static int next_pri = DEFAULT_PRI + PRI_BIAS; static unsigned int started, idle, wanted; @@ -322,7 +357,7 @@ } static int poll_cb (); -static void req_invoke (aio_req req); +static int req_invoke (aio_req req); static void req_free (aio_req req); static void req_cancel (aio_req req); @@ -352,6 +387,8 @@ static void aio_grp_feed (aio_req grp) { + block_sig (); + while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) { int old_len = grp->size; @@ -379,6 +416,8 @@ break; } } + + unblock_sig (); } static void aio_grp_dec (aio_req grp) @@ -391,12 +430,21 @@ /* finish, if done */ if (!grp->size && grp->int1) { - req_invoke (grp); + block_sig (); + + if (!req_invoke (grp)) + { + req_free (grp); + unblock_sig (); + croak (0); + } + req_free (grp); + unblock_sig (); } } -static void req_invoke (aio_req req) +static int req_invoke (aio_req req) { dSP; @@ -526,11 +574,7 @@ aio_grp_dec (grp); } - if (SvTRUE (ERRSV)) - { - req_free (req); - croak (0); - } + return !SvTRUE (ERRSV); } static void req_free (aio_req req) @@ -595,7 +639,7 @@ sigfillset (&fullsigset); LOCK (wrklock); - sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); + pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) { @@ -608,7 +652,7 @@ else free (wrk); - sigprocmask (SIG_SETMASK, &oldsigset, 0); + pthread_sigmask (SIG_SETMASK, &oldsigset, 0); UNLOCK (wrklock); } @@ -626,6 +670,8 @@ static void req_send (aio_req req) { + block_sig (); + ++nreqs; LOCK (reqlock); @@ -634,6 +680,8 @@ pthread_cond_signal (&reqwait); UNLOCK (reqlock); + unblock_sig (); + maybe_start_thread (); } @@ -713,6 +761,8 @@ if (max_poll_time) gettimeofday (&tv_start, 0); + block_sig (); + for (;;) { for (;;) @@ -749,7 +799,12 @@ } else { - req_invoke (req); + if (!req_invoke (req)) + { + req_free (req); + unblock_sig (); + croak (0); + } count++; } @@ -776,6 +831,7 @@ ++maxreqs; } + unblock_sig (); return count; } @@ -1156,8 +1212,14 @@ ++npending; if (!reqq_push (&res_queue, req)) - /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + { + /* write a dummy byte to the pipe so fh becomes ready */ + write (respipe [1], &respipe, 1); + + /* optionally signal the main thread asynchronously */ + if (main_sig) + pthread_kill (main_tid, main_sig); + } self->req = 0; worker_clear (self); @@ -1270,6 +1332,7 @@ newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); + newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); create_pipe (); pthread_atfork (atfork_prepare, atfork_parent, atfork_child); @@ -1679,6 +1742,43 @@ CODE: poll_wait (); +void +setsig (int signum = SIGIO) + PROTOTYPE: ;$ + CODE: +{ + if (block_sig_level) + croak ("cannot call IO::AIO::setsig from within aio_block/callback"); + + LOCK (reslock); + main_tid = pthread_self (); + main_sig = signum; + UNLOCK (reslock); + + if (main_sig && npending) + pthread_kill (main_tid, main_sig); +} + +void +aio_block (SV *cb) + PROTOTYPE: & + PPCODE: +{ + int count; + + block_sig (); + PUSHMARK (SP); + PUTBACK; + count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL); + SPAGAIN; + unblock_sig (); + + if (SvTRUE (ERRSV)) + croak (0); + + XSRETURN (count); +} + int nreqs() PROTOTYPE: @@ -1737,6 +1837,9 @@ int i; aio_req req; + if (main_sig && !block_sig_level) + croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use"); + if (grp->int1 == 2) croak ("cannot add requests to IO::AIO::GRP after the group finished");