--- Async-Interrupt/Interrupt.xs 2009/07/03 21:11:22 1.5 +++ Async-Interrupt/Interrupt.xs 2012/04/24 22:01:59 1.19 @@ -2,6 +2,9 @@ #include "perl.h" #include "XSUB.h" +#include "ecb.h" +#include "schmorp.h" + typedef volatile sig_atomic_t atomic_t; static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ @@ -22,82 +25,102 @@ # undef HAS_SA_SIGINFO #endif -static int -extract_fd (SV *fh, int wr) -{ - int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh))); - - if (fd < 0) - croak ("illegal fh argument, either not an OS file or read/write mode mismatch"); - - return fd; -} +/*****************************************************************************/ -static SV * -get_cb (SV *cb_sv) -{ - HV *st; - GV *gvp; - CV *cv; +typedef struct { + SV *cb; + void (*c_cb)(pTHX_ void *c_arg, int value); + void *c_arg; + SV *fh_r, *fh_w; + SV *value; + int signum; + int autodrain; + ANY *scope_savestack; + volatile int blocked; + + s_epipe ep; + int fd_wlen; + atomic_t fd_enable; + atomic_t pending; + volatile IV *valuep; + atomic_t hysteresis; +} async_t; - if (!SvOK (cb_sv)) - return 0; +static AV *asyncs; +static async_t *sig_async [SIG_SIZE]; - cv = sv_2cv (cb_sv, &st, &gvp, 0); +#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv)) +#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv)) - if (!cv) - croak ("Async::Interrupt callback must be undef or a CODE reference"); +static void async_signal (void *signal_arg, int value); - return (SV *)cv; +static void +setsig (int signum, void (*handler)(int)) +{ +#if _WIN32 + signal (signum, handler); +#else + struct sigaction sa; + sa.sa_handler = handler; + sigfillset (&sa.sa_mask); + sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */ + sigaction (signum, &sa, 0); +#endif } -static AV *asyncs; - -struct async { - SV *cb; - void (*c_cb)(pTHX_ void *c_arg, int value); - void *c_arg; - SV *fh_r, *fh_w; - int blocked; - - int fd_r, fd_w; - atomic_t value; - atomic_t pending; -}; +static void +async_sigsend (int signum) +{ + async_signal (sig_async [signum], 0); +} /* the main workhorse to signal */ static void async_signal (void *signal_arg, int value) { - struct async *async = (struct async *)signal_arg; + static char pipedata [8]; + + async_t *async = (async_t *)signal_arg; int pending = async->pending; - async->value = value; + if (async->hysteresis) + setsig (async->signum, SIG_IGN); + + *async->valuep = value ? value : 1; + ECB_MEMORY_FENCE_RELEASE; async->pending = 1; + ECB_MEMORY_FENCE_RELEASE; async_pending = 1; - psig_pend [9] = 1; - *sig_pending = 1; + ECB_MEMORY_FENCE_RELEASE; - if (!pending && async->fd_w >= 0) - write (async->fd_w, async, 1); + if (!async->blocked) + { + psig_pend [9] = 1; + ECB_MEMORY_FENCE_RELEASE; + *sig_pending = 1; + ECB_MEMORY_FENCE_RELEASE; + } + + if (!pending && async->fd_enable && async->ep.len) + s_epipe_signal (&async->ep); } static void -handle_async (struct async *async) +handle_async (async_t *async) { int old_errno = errno; - int value = async->value; + int value = *async->valuep; + *async->valuep = 0; async->pending = 0; - /* drain pipe */ - if (async->fd_r >= 0) - { - char dummy [4]; + /* restore signal */ + if (async->hysteresis) + setsig (async->signum, async_sigsend); - while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy)) - ; - } + /* drain pipe */ + if (async->fd_enable && async->ep.len && async->autodrain) + s_epipe_drain (&async->ep); if (async->c_cb) { @@ -152,14 +175,26 @@ { int i; + ECB_MEMORY_FENCE_ACQUIRE; + async_pending = 0; for (i = AvFILLp (asyncs); i >= 0; --i) { - struct async *async = INT2PTR (struct async *, SvIVX (AvARRAY (asyncs)[i])); + SV *async_sv = AvARRAY (asyncs)[i]; + async_t *async = SvASYNC_nrv (async_sv); if (async->pending && !async->blocked) - handle_async (async); + { + /* temporarily keep a refcount */ + SvREFCNT_inc (async_sv); + handle_async (async); + SvREFCNT_dec (async_sv); + + /* the handler could have deleted any number of asyncs */ + if (i > AvFILLp (asyncs)) + i = AvFILLp (asyncs); + } } } @@ -181,18 +216,45 @@ } #endif +#define block(async) ++(async)->blocked + static void -scope_block_cb (pTHX_ void *async_sv) +unblock (async_t *async) { - struct async *async = INT2PTR (struct async *, SvIVX ((SV *)async_sv)); - --async->blocked; if (async->pending && !async->blocked) handle_async (async); +} +static void +scope_block_cb (pTHX_ void *async_sv) +{ + async_t *async = SvASYNC_nrv ((SV *)async_sv); + + async->scope_savestack = 0; + unblock (async); SvREFCNT_dec (async_sv); } +static void +scope_block (SV *async_sv) +{ + async_t *async = SvASYNC_nrv (async_sv); + + /* as a heuristic, we skip the scope block if we already are blocked */ + /* and the existing scope block used the same savestack */ + + if (!async->scope_savestack || async->scope_savestack != PL_savestack) + { + async->scope_savestack = PL_savestack; + block (async); + + LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ + SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv)); + ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ + } +} + MODULE = Async::Interrupt PACKAGE = Async::Interrupt BOOT: @@ -205,89 +267,181 @@ PROTOTYPES: DISABLE -SV * -_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w) - CODE: +void +_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue) + PPCODE: { - SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; - int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1; - int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1; - struct async *async; - - Newz (0, async, 1, struct async); - - async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; - async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; - async->cb = cv; - async->c_cb = c_cb; - async->c_arg = c_arg; + SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0; + async_t *async; - printf ("r,w %d,%d\n", fd_r, fd_w);//D + Newz (0, async, 1, async_t); - RETVAL = newSViv (PTR2IV (async)); - av_push (asyncs, RETVAL); + XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); + /* TODO: need to bless right now to ensure deallocation */ + av_push (asyncs, TOPs); + + SvGETMAGIC (fh_r); SvGETMAGIC (fh_w); + if (SvOK (fh_r) || SvOK (fh_w)) + { + int fd_r = s_fileno_croak (fh_r, 0); + int fd_w = s_fileno_croak (fh_w, 1); + + async->fh_r = newSVsv (fh_r); + async->fh_w = newSVsv (fh_w); + async->ep.fd [0] = fd_r; + async->ep.fd [1] = fd_w; + async->ep.len = 1; + async->fd_enable = 1; + } + + async->value = SvROK (pvalue) + ? SvREFCNT_inc_NN (SvRV (pvalue)) + : NEWSV (0, 0); + + sv_setiv (async->value, 0); + SvIOK_only (async->value); /* just to be sure */ + SvREADONLY_on (async->value); + + async->valuep = &(SvIVX (async->value)); + + async->autodrain = 1; + async->cb = cv; + async->c_cb = c_cb; + async->c_arg = c_arg; + async->signum = SvOK (signl) ? s_signum_croak (signl) : 0; + + if (async->signum) + { + if (async->signum < 0) + croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl)); + + sig_async [async->signum] = async; + setsig (async->signum, async_sigsend); + } } - OUTPUT: - RETVAL void -signal_func (SV *self) +signal_hysteresis (async_t *async, int enable) + CODE: + async->hysteresis = enable; + +void +signal_func (async_t *async) PPCODE: EXTEND (SP, 2); PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); - PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (self))))); + PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); void -signal (SV *self, int value = 0) +scope_block_func (SV *self) + PPCODE: + EXTEND (SP, 2); + PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block)))); + PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self))))); + +IV +c_var (async_t *async) CODE: - async_signal (INT2PTR (void *, SvIVX (SvRV (self))), value); + RETVAL = PTR2IV (async->valuep); + OUTPUT: + RETVAL void -block (SV *self) +handle (async_t *async) CODE: -{ - struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self))); - ++async->blocked; -} + handle_async (async); void -unblock (SV *self) +signal (async_t *async, int value = 1) CODE: -{ - struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self))); - --async->blocked; - if (async->pending && !async->blocked) - handle_async (async); -} + async_signal (async, value); + +void +block (async_t *async) + CODE: + block (async); + +void +unblock (async_t *async) + CODE: + unblock (async); void scope_block (SV *self) CODE: -{ - SV *async_sv = SvRV (self); - struct async *async = INT2PTR (struct async *, SvIVX (async_sv)); - ++async->blocked; + scope_block (SvRV (self)); - LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ - SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv)); - ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ -} +void +pipe_enable (async_t *async) + ALIAS: + pipe_enable = 1 + pipe_disable = 0 + CODE: + async->fd_enable = ix; + +int +pipe_fileno (async_t *async) + CODE: + if (!async->ep.len) + { + int res; + + /*block (async);*//*TODO*/ + res = s_epipe_new (&async->ep); + async->fd_enable = 1; + /*unblock (async);*//*TODO*/ + + if (res < 0) + croak ("Async::Interrupt: unable to initialize event pipe"); + } + + RETVAL = async->ep.fd [0]; + OUTPUT: + RETVAL + +int +pipe_autodrain (async_t *async, int enable = -1) + CODE: + RETVAL = async->autodrain; + if (enable >= 0) + async->autodrain = enable; + OUTPUT: + RETVAL + +void +pipe_drain (async_t *async) + CODE: + if (async->ep.len) + s_epipe_drain (&async->ep); + +void +post_fork (async_t *async) + CODE: + if (async->ep.len) + { + int res; + + /*block (async);*//*TODO*/ + res = s_epipe_renew (&async->ep); + /*unblock (async);*//*TODO*/ + + if (res < 0) + croak ("Async::Interrupt: unable to initialize event pipe after fork"); + } void DESTROY (SV *self) CODE: { - int i; - SV *async_sv = SvRV (self); - struct async *async = INT2PTR (struct async *, SvIVX (async_sv)); + int i; + SV *async_sv = SvRV (self); + async_t *async = SvASYNC_nrv (async_sv); for (i = AvFILLp (asyncs); i >= 0; --i) if (AvARRAY (asyncs)[i] == async_sv) { - if (i < AvFILLp (asyncs)) - AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; - - assert (av_pop (asyncs) == async_sv); + AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; + av_pop (asyncs); goto found; } @@ -295,10 +449,101 @@ warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); found: + + if (async->signum) + setsig (async->signum, SIG_DFL); + + if (!async->fh_r && async->ep.len) + s_epipe_destroy (&async->ep); + SvREFCNT_dec (async->fh_r); SvREFCNT_dec (async->fh_w); SvREFCNT_dec (async->cb); + SvREFCNT_dec (async->value); Safefree (async); } +SV * +sig2num (SV *signame_or_number) + ALIAS: + sig2num = 0 + sig2name = 1 + PROTOTYPE: $ + CODE: +{ + int signum = s_signum (signame_or_number); + + if (signum < 0) + RETVAL = &PL_sv_undef; + else if (ix) + RETVAL = newSVpv (PL_sig_name [signum], 0); + else + RETVAL = newSViv (signum); +} + OUTPUT: + RETVAL + +MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_ + +void +new (const char *klass) + PPCODE: +{ + s_epipe *epp; + + Newz (0, epp, 1, s_epipe); + XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp))); + + if (s_epipe_new (epp) < 0) + croak ("Async::Interrupt::EventPipe: unable to create new event pipe"); +} + +void +filenos (s_epipe *epp) + PPCODE: + EXTEND (SP, 2); + PUSHs (sv_2mortal (newSViv (epp->fd [0]))); + PUSHs (sv_2mortal (newSViv (epp->fd [1]))); + +int +fileno (s_epipe *epp) + ALIAS: + fileno = 0 + fileno_r = 0 + fileno_w = 1 + CODE: + RETVAL = epp->fd [ix]; + OUTPUT: + RETVAL + +int +type (s_epipe *epp) + CODE: + RETVAL = epp->len; + OUTPUT: + RETVAL + +void +s_epipe_signal (s_epipe *epp) + +void +s_epipe_drain (s_epipe *epp) + +void +signal_func (s_epipe *epp) + ALIAS: + drain_func = 1 + PPCODE: + EXTEND (SP, 2); + PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal)))); + PUSHs (sv_2mortal (newSViv (PTR2IV (epp)))); + +void +s_epipe_wait (s_epipe *epp) + +void +DESTROY (s_epipe *epp) + CODE: + s_epipe_destroy (epp); +