--- IO-AIO/AIO.xs 2006/10/31 00:11:52 1.89 +++ IO-AIO/AIO.xs 2006/12/23 04:49:37 1.96 @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -89,6 +90,8 @@ if (!aio_buf) \ return -1; +typedef SV SV8; /* byte-sv, used for argument-checking */ + enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, @@ -160,6 +163,40 @@ + ((tv2->tv_usec - tv1->tv_usec) >> 10); } +static pthread_t main_tid; +static int main_sig; +static int block_sig_level; + +void block_sig () +{ + sigset_t ss; + + if (block_sig_level++) + return; + + if (!main_sig) + return; + + sigemptyset (&ss); + sigaddset (&ss, main_sig); + pthread_sigmask (SIG_BLOCK, &ss, 0); +} + +void unblock_sig () +{ + sigset_t ss; + + if (--block_sig_level) + return; + + if (!main_sig) + return; + + sigemptyset (&ss); + sigaddset (&ss, main_sig); + pthread_sigmask (SIG_UNBLOCK, &ss, 0); +} + static int next_pri = DEFAULT_PRI + PRI_BIAS; static unsigned int started, idle, wanted; @@ -320,7 +357,7 @@ } static int poll_cb (); -static void req_invoke (aio_req req); +static int req_invoke (aio_req req); static void req_free (aio_req req); static void req_cancel (aio_req req); @@ -350,6 +387,8 @@ static void aio_grp_feed (aio_req grp) { + block_sig (); + while (grp->size < grp->int2 && !(grp->flags & FLAG_CANCELLED)) { int old_len = grp->size; @@ -377,6 +416,8 @@ break; } } + + unblock_sig (); } static void aio_grp_dec (aio_req grp) @@ -389,12 +430,21 @@ /* finish, if done */ if (!grp->size && grp->int1) { - req_invoke (grp); + block_sig (); + + if (!req_invoke (grp)) + { + req_free (grp); + unblock_sig (); + croak (0); + } + req_free (grp); + unblock_sig (); } } -static void req_invoke (aio_req req) +static int req_invoke (aio_req req) { dSP; @@ -447,10 +497,9 @@ call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); SPAGAIN; - fh = SvREFCNT_inc (POPs); - + fh = POPs; PUSHMARK (SP); - XPUSHs (sv_2mortal (fh)); + XPUSHs (fh); } break; @@ -525,11 +574,7 @@ aio_grp_dec (grp); } - if (SvTRUE (ERRSV)) - { - req_free (req); - croak (0); - } + return !SvTRUE (ERRSV); } static void req_free (aio_req req) @@ -594,7 +639,7 @@ sigfillset (&fullsigset); LOCK (wrklock); - sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset); + pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) { @@ -607,7 +652,7 @@ else free (wrk); - sigprocmask (SIG_SETMASK, &oldsigset, 0); + pthread_sigmask (SIG_SETMASK, &oldsigset, 0); UNLOCK (wrklock); } @@ -625,6 +670,8 @@ static void req_send (aio_req req) { + block_sig (); + ++nreqs; LOCK (reqlock); @@ -633,6 +680,8 @@ pthread_cond_signal (&reqwait); UNLOCK (reqlock); + unblock_sig (); + maybe_start_thread (); } @@ -712,6 +761,8 @@ if (max_poll_time) gettimeofday (&tv_start, 0); + block_sig (); + for (;;) { for (;;) @@ -728,8 +779,8 @@ if (!res_queue.size) { /* read any signals sent by the worker threads */ - char buf [32]; - while (read (respipe [0], buf, 32) == 32) + char buf [4]; + while (read (respipe [0], buf, 4) == 4) ; } } @@ -748,7 +799,12 @@ } else { - req_invoke (req); + if (!req_invoke (req)) + { + req_free (req); + unblock_sig (); + croak (0); + } count++; } @@ -775,6 +831,7 @@ ++maxreqs; } + unblock_sig (); return count; } @@ -1155,8 +1212,14 @@ ++npending; if (!reqq_push (&res_queue, req)) - /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + { + /* write a dummy byte to the pipe so fh becomes ready */ + write (respipe [1], &respipe, 1); + + /* optionally signal the main thread asynchronously */ + if (main_sig) + pthread_kill (main_tid, main_sig); + } self->req = 0; worker_clear (self); @@ -1269,6 +1332,7 @@ newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); + newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); create_pipe (); pthread_atfork (atfork_prepare, atfork_parent, atfork_child); @@ -1311,7 +1375,7 @@ void aio_open (pathname,flags,mode,callback=&PL_sv_undef) - SV * pathname + SV8 * pathname int flags int mode SV * callback @@ -1354,7 +1418,7 @@ SV * fh UV offset UV length - SV * data + SV8 * data UV dataoffset SV * callback ALIAS: @@ -1384,7 +1448,7 @@ else { /* read: grow scalar as necessary */ - svptr = SvGROW (data, length + dataoffset); + svptr = SvGROW (data, length + dataoffset + 1); } if (length < 0) @@ -1415,7 +1479,7 @@ void aio_readlink (path,callback=&PL_sv_undef) - SV * path + SV8 * path SV * callback PROTOTYPE: $$;$ PPCODE: @@ -1480,7 +1544,7 @@ void aio_stat (fh_or_path,callback=&PL_sv_undef) - SV * fh_or_path + SV8 * fh_or_path SV * callback ALIAS: aio_stat = REQ_STAT @@ -1516,7 +1580,7 @@ void aio_unlink (pathname,callback=&PL_sv_undef) - SV * pathname + SV8 * pathname SV * callback ALIAS: aio_unlink = REQ_UNLINK @@ -1535,8 +1599,8 @@ void aio_link (oldpath,newpath,callback=&PL_sv_undef) - SV * oldpath - SV * newpath + SV8 * oldpath + SV8 * newpath SV * callback ALIAS: aio_link = REQ_LINK @@ -1557,7 +1621,7 @@ void aio_mknod (pathname,mode,dev,callback=&PL_sv_undef) - SV * pathname + SV8 * pathname SV * callback UV mode UV dev @@ -1644,18 +1708,17 @@ while (nreqs) { poll_wait (); - poll_cb (0); + poll_cb (); } -void +int poll() PROTOTYPE: CODE: - if (nreqs) - { - poll_wait (); - poll_cb (0); - } + poll_wait (); + RETVAL = poll_cb (); + OUTPUT: + RETVAL int poll_fileno() @@ -1677,8 +1740,44 @@ poll_wait() PROTOTYPE: CODE: - if (nreqs) - poll_wait (); + poll_wait (); + +void +setsig (int signum = SIGIO) + PROTOTYPE: ;$ + CODE: +{ + if (block_sig_level) + croak ("cannot call IO::AIO::setsig from within aio_block/callback"); + + LOCK (reslock); + main_tid = pthread_self (); + main_sig = signum; + UNLOCK (reslock); + + if (main_sig && npending) + pthread_kill (main_tid, main_sig); +} + +void +aio_block (SV *cb) + PROTOTYPE: & + PPCODE: +{ + int count; + + block_sig (); + PUSHMARK (SP); + PUTBACK; + count = call_sv (cb, GIMME_V | G_NOARGS | G_EVAL); + SPAGAIN; + unblock_sig (); + + if (SvTRUE (ERRSV)) + croak (0); + + XSRETURN (count); +} int nreqs() @@ -1738,6 +1837,9 @@ int i; aio_req req; + if (main_sig && !block_sig_level) + croak ("aio_group->add called outside aio_block/callback context while IO::AIO::setsig is in use"); + if (grp->int1 == 2) croak ("cannot add requests to IO::AIO::GRP after the group finished");