--- Async-Interrupt/Interrupt.xs 2009/07/14 14:18:10 1.7 +++ Async-Interrupt/Interrupt.xs 2009/09/01 16:41:29 1.16 @@ -2,6 +2,8 @@ #include "perl.h" #include "XSUB.h" +#include "schmorp.h" + typedef volatile sig_atomic_t atomic_t; static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ @@ -23,78 +25,24 @@ #endif /*****************************************************************************/ -/* support stuff, copied from EV.xs mostly */ - -static int -extract_fd (SV *fh, int wr) -{ - int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh))); - - if (fd < 0) - croak ("illegal fh argument, either not an OS file or read/write mode mismatch"); - - return fd; -} - -static SV * -get_cb (SV *cb_sv) -{ - HV *st; - GV *gvp; - CV *cv; - - if (!SvOK (cb_sv)) - return 0; - - cv = sv_2cv (cb_sv, &st, &gvp, 0); - - if (!cv) - croak ("Async::Interrupt callback must be undef or a CODE reference"); - - return (SV *)cv; -} - -#ifndef SIG_SIZE -/* kudos to Slaven Rezic for the idea */ -static char sig_size [] = { SIG_NUM }; -# define SIG_SIZE (sizeof (sig_size) + 1) -#endif - -static int -sv_signum (SV *sig) -{ - int signum; - - SvGETMAGIC (sig); - - for (signum = 1; signum < SIG_SIZE; ++signum) - if (strEQ (SvPV_nolen (sig), PL_sig_name [signum])) - return signum; - - signum = SvIV (sig); - - if (signum > 0 && signum < SIG_SIZE) - return signum; - - return -1; -} - -/*****************************************************************************/ typedef struct { SV *cb; void (*c_cb)(pTHX_ void *c_arg, int value); void *c_arg; SV *fh_r, *fh_w; + SV *value; int signum; + int autodrain; + ANY *scope_savestack; volatile int blocked; - int fd_r; - volatile int fd_w; + s_epipe ep; int fd_wlen; atomic_t fd_enable; - atomic_t value; atomic_t pending; + volatile IV *valuep; + atomic_t hysteresis; } async_t; static AV *asyncs; @@ -103,6 +51,27 @@ #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv)) #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv)) +static void async_signal (void *signal_arg, int value); + +static void +setsig (int signum, void (*handler)(int)) +{ +#if _WIN32 + signal (async->signum, handler); +#else + struct sigaction sa; + sa.sa_handler = handler; + sigfillset (&sa.sa_mask); + sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */ + sigaction (signum, &sa, 0); +} + +static void +async_sigsend (int signum) +{ + async_signal (sig_async [signum], 0); +} + /* the main workhorse to signal */ static void async_signal (void *signal_arg, int value) @@ -112,39 +81,35 @@ async_t *async = (async_t *)signal_arg; int pending = async->pending; - async->value = value; + if (async->hysteresis) + setsig (async->signum, SIG_IGN); + + *async->valuep = value ? value : 1; async->pending = 1; async_pending = 1; psig_pend [9] = 1; *sig_pending = 1; - { - int fd_w = async->fd_w; - int fd_enable = async->fd_enable; - - if (!pending && fd_w >= 0 && fd_enable) - if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL) - /* on EINVAL we assume it's an eventfd */ - write (fd_w, pipedata, (async->fd_wlen = 8)); - } + if (!pending && async->fd_enable && async->ep.len) + s_epipe_signal (&async->ep); } static void handle_async (async_t *async) { int old_errno = errno; - int value = async->value; + int value = *async->valuep; + *async->valuep = 0; async->pending = 0; - /* drain pipe */ - if (async->fd_r >= 0 && async->fd_enable) - { - char dummy [9]; /* 9 is enough for eventfd and normal pipes */ + /* restore signal */ + if (async->hysteresis) + setsig (async->signum, async_sigsend); - while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy)) - ; - } + /* drain pipe */ + if (async->fd_enable && async->ep.len && async->autodrain) + s_epipe_drain (&async->ep); if (async->c_cb) { @@ -203,10 +168,20 @@ for (i = AvFILLp (asyncs); i >= 0; --i) { - async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]); + SV *async_sv = AvARRAY (asyncs)[i]; + async_t *async = SvASYNC_nrv (async_sv); if (async->pending && !async->blocked) - handle_async (async); + { + /* temporarily keep a refcount */ + SvREFCNT_inc (async_sv); + handle_async (async); + SvREFCNT_dec (async_sv); + + /* the handler could have deleted any number of asyncs */ + if (i > AvFILLp (asyncs)) + i = AvFILLp (asyncs); + } } } @@ -228,12 +203,6 @@ } #endif -static void -async_sigsend (int signum) -{ - async_signal (sig_async [signum], 0); -} - #define block(async) ++(async)->blocked static void @@ -248,10 +217,32 @@ scope_block_cb (pTHX_ void *async_sv) { async_t *async = SvASYNC_nrv ((SV *)async_sv); + + async->scope_savestack = 0; unblock (async); SvREFCNT_dec (async_sv); } +static void +scope_block (SV *async_sv) +{ + async_t *async = SvASYNC_nrv (async_sv); + + /* as a heuristic, we skip the scope block if we already are blocked */ + /* and the existing scope block used the same savestack */ + + if (!async->scope_savestack || async->scope_savestack != PL_savestack) + { + async->scope_savestack = PL_savestack; + block (async); + + LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ + SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv)); + ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ + } +} + +#endif MODULE = Async::Interrupt PACKAGE = Async::Interrupt BOOT: @@ -265,28 +256,47 @@ PROTOTYPES: DISABLE void -_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) +_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue) PPCODE: { - SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; - int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1; - int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1; + SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0; async_t *async; Newz (0, async, 1, async_t); XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); + /* TODO: need to bless right now to ensure deallocation */ av_push (asyncs, TOPs); - async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; - async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; - async->fd_wlen = 1; - async->fd_enable = 1; + SvGETMAGIC (fh_r); SvGETMAGIC (fh_w); + if (SvOK (fh_r) || SvOK (fh_w)) + { + int fd_r = s_fileno_croak (fh_r, 0); + int fd_w = s_fileno_croak (fh_w, 1); + + async->fh_r = newSVsv (fh_r); + async->fh_w = newSVsv (fh_w); + async->ep.fd [0] = fd_r; + async->ep.fd [1] = fd_w; + async->ep.len = 1; + async->fd_enable = 1; + } + + async->value = SvROK (pvalue) + ? SvREFCNT_inc_NN (SvRV (pvalue)) + : NEWSV (0, 0); + + sv_setiv (async->value, 0); + SvIOK_only (async->value); /* just to be sure */ + SvREADONLY_on (async->value); + + async->valuep = &(SvIVX (async->value)); + + async->autodrain = 1; async->cb = cv; async->c_cb = c_cb; async->c_arg = c_arg; - SvGETMAGIC (signl); - async->signum = SvOK (signl) ? sv_signum (signl) : 0; + async->signum = SvOK (signl) ? s_signum_croak (signl) : 0; if (async->signum) { @@ -294,20 +304,16 @@ croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl)); sig_async [async->signum] = async; -#if _WIN32 - signal (async->signum, async_sigsend); -#else - { - struct sigaction sa = { }; - sa.sa_handler = async_sigsend; - sigfillset (&sa.sa_mask); - sigaction (async->signum, &sa, 0); - } -#endif + setsig (async->signum, async_sigsend); } } void +signal_hysteresis (async_t *async, int enable) + CODE: + async->hysteresis = enable; + +void signal_func (async_t *async) PPCODE: EXTEND (SP, 2); @@ -315,7 +321,21 @@ PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); void -signal (async_t *async, int value = 0) +scope_block_func (SV *self) + PPCODE: + EXTEND (SP, 2); + PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block)))); + PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self))))); + +IV +c_var (async_t *async) + CODE: + RETVAL = PTR2IV (async->valuep); + OUTPUT: + RETVAL + +void +signal (async_t *async, int value = 1) CODE: async_signal (async, value); @@ -332,15 +352,7 @@ void scope_block (SV *self) CODE: -{ - SV *async_sv = SvRV (self); - async_t *async = SvASYNC_nrv (async_sv); - block (async); - - LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ - SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv)); - ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ -} + scope_block (SvRV (self)); void pipe_enable (async_t *async) @@ -350,6 +362,50 @@ CODE: async->fd_enable = ix; +int +pipe_fileno (async_t *async) + CODE: + if (!async->ep.len) + { + int res; + + /*block (async);*//*TODO*/ + res = s_epipe_new (&async->ep); + async->fd_enable = 1; + /*unblock (async);*//*TODO*/ + + if (res < 0) + croak ("Async::Interrupt: unable to initialize event pipe"); + } + + RETVAL = async->ep.fd [0]; + OUTPUT: + RETVAL + +int +pipe_autodrain (async_t *async, int enable = -1) + CODE: + RETVAL = async->autodrain; + if (enable >= 0) + async->autodrain = enable; + OUTPUT: + RETVAL + +void +post_fork (async_t *async) + CODE: + if (async->ep.len) + { + int res; + + /*block (async);*//*TODO*/ + res = s_epipe_renew (&async->ep); + /*unblock (async);*//*TODO*/ + + if (res < 0) + croak ("Async::Interrupt: unable to initialize event pipe after fork"); + } + void DESTROY (SV *self) CODE: @@ -361,10 +417,8 @@ for (i = AvFILLp (asyncs); i >= 0; --i) if (AvARRAY (asyncs)[i] == async_sv) { - if (i < AvFILLp (asyncs)) - AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; - - assert (av_pop (asyncs) == async_sv); + AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; + av_pop (asyncs); goto found; } @@ -374,22 +428,98 @@ found: if (async->signum) - { -#if _WIN32 - signal (async->signum, SIG_DFL); -#else - { - struct sigaction sa = { }; - sa.sa_handler = SIG_DFL; - sigaction (async->signum, &sa, 0); - } -#endif - } + setsig (async->signum, SIG_DFL); + + if (!async->fh_r && async->ep.len) + s_epipe_destroy (&async->ep); SvREFCNT_dec (async->fh_r); SvREFCNT_dec (async->fh_w); SvREFCNT_dec (async->cb); + SvREFCNT_dec (async->value); Safefree (async); } +SV * +sig2num (SV *signame_or_number) + ALIAS: + sig2num = 0 + sig2name = 1 + PROTOTYPE: $ + CODE: +{ + int signum = s_signum (signame_or_number); + + if (signum < 0) + RETVAL = &PL_sv_undef; + else if (ix) + RETVAL = newSVpv (PL_sig_name [signum], 0); + else + RETVAL = newSViv (signum); +} + OUTPUT: + RETVAL + +MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_ + +void +new (const char *klass) + PPCODE: +{ + s_epipe *epp; + SV *self; + + Newz (0, epp, 1, s_epipe); + XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp))); + + if (s_epipe_new (epp) < 0) + croak ("Async::Interrupt::EventPipe: unable to create new event pipe"); +} + +void +filenos (s_epipe *epp) + PPCODE: + EXTEND (SP, 2); + PUSHs (sv_2mortal (newSViv (epp->fd [0]))); + PUSHs (sv_2mortal (newSViv (epp->fd [1]))); + +int +fileno (s_epipe *epp) + ALIAS: + fileno = 0 + fileno_r = 0 + fileno_w = 1 + CODE: + RETVAL = epp->fd [ix]; + OUTPUT: + RETVAL + +int +type (s_epipe *epp) + CODE: + RETVAL = epp->len; + OUTPUT: + RETVAL + +void +s_epipe_signal (s_epipe *epp) + +void +s_epipe_drain (s_epipe *epp) + +void +drain_func (s_epipe *epp) + PPCODE: + EXTEND (SP, 2); + PUSHs (sv_2mortal (newSViv (PTR2IV (s_epipe_drain)))); + PUSHs (sv_2mortal (newSViv (PTR2IV (epp)))); + +void +s_epipe_wait (s_epipe *epp) + +void +DESTROY (s_epipe *epp) + CODE: + s_epipe_destroy (epp); +