ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.13
Committed: Tue Jul 28 01:19:44 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.12: +67 -39 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.8 #include "schmorp.h"
6    
7 root 1.1 typedef volatile sig_atomic_t atomic_t;
8    
9     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
10     static Sighandler_t old_sighandler;
11     static atomic_t async_pending;
12    
13 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
14     (PERL_REVISION > (a) \
15     || (PERL_REVISION == (a) \
16     && (PERL_VERSION > (b) \
17     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
18    
19     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
20     # define HAS_SA_SIGINFO 1
21     #endif
22    
23     #if !PERL_VERSION_ATLEAST(5,10,0)
24     # undef HAS_SA_SIGINFO
25     #endif
26    
27 root 1.6 /*****************************************************************************/
28 root 1.1
29 root 1.6 typedef struct {
30 root 1.1 SV *cb;
31 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
32     void *c_arg;
33     SV *fh_r, *fh_w;
34 root 1.9 SV *value;
35 root 1.6 int signum;
36 root 1.11 int autodrain;
37 root 1.13 ANY *scope_savestack;
38 root 1.7 volatile int blocked;
39 root 1.1
40 root 1.9 s_epipe ep;
41 root 1.6 int fd_wlen;
42     atomic_t fd_enable;
43 root 1.2 atomic_t pending;
44 root 1.9 volatile IV *valuep;
45 root 1.13 atomic_t hysteresis;
46 root 1.6 } async_t;
47    
48     static AV *asyncs;
49     static async_t *sig_async [SIG_SIZE];
50    
51     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
52     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
53 root 1.1
54 root 1.13 static void async_signal (void *signal_arg, int value);
55    
56     static void
57     setsig (int signum, void (*handler)(int))
58     {
59     #if _WIN32
60     signal (async->signum, handler);
61     #else
62     struct sigaction sa;
63     sa.sa_handler = handler;
64     sigfillset (&sa.sa_mask);
65     sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
66     sigaction (signum, &sa, 0);
67     }
68    
69     static void
70     async_sigsend (int signum)
71     {
72     async_signal (sig_async [signum], 0);
73     }
74    
75 root 1.1 /* the main workhorse to signal */
76     static void
77     async_signal (void *signal_arg, int value)
78     {
79 root 1.6 static char pipedata [8];
80    
81     async_t *async = (async_t *)signal_arg;
82 root 1.2 int pending = async->pending;
83 root 1.1
84 root 1.13 if (async->hysteresis)
85     setsig (async->signum, SIG_IGN);
86    
87 root 1.9 *async->valuep = value ? value : 1;
88 root 1.2 async->pending = 1;
89     async_pending = 1;
90     psig_pend [9] = 1;
91     *sig_pending = 1;
92 root 1.1
93 root 1.10 if (!pending && async->fd_enable && async->ep.len)
94     s_epipe_signal (&async->ep);
95 root 1.2 }
96    
97     static void
98 root 1.6 handle_async (async_t *async)
99 root 1.2 {
100     int old_errno = errno;
101 root 1.9 int value = *async->valuep;
102 root 1.2
103 root 1.9 *async->valuep = 0;
104 root 1.2 async->pending = 0;
105    
106 root 1.13 /* restore signal */
107     if (async->hysteresis)
108     setsig (async->signum, async_sigsend);
109    
110 root 1.2 /* drain pipe */
111 root 1.11 if (async->fd_enable && async->ep.len && async->autodrain)
112 root 1.9 s_epipe_drain (&async->ep);
113 root 1.2
114     if (async->c_cb)
115     {
116     dTHX;
117     async->c_cb (aTHX_ async->c_arg, value);
118     }
119    
120     if (async->cb)
121     {
122     dSP;
123    
124     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
125     SV *savedie = PL_diehook;
126    
127     PL_diehook = 0;
128    
129     PUSHSTACKi (PERLSI_SIGNAL);
130    
131     PUSHMARK (SP);
132     XPUSHs (sv_2mortal (newSViv (value)));
133     PUTBACK;
134     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
135    
136     if (SvTRUE (ERRSV))
137     {
138     SPAGAIN;
139    
140     PUSHMARK (SP);
141     PUTBACK;
142     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
143    
144     sv_setpvn (ERRSV, "", 0);
145     }
146    
147     if (saveerr)
148     sv_setsv (ERRSV, saveerr);
149 root 1.1
150 root 1.2 {
151     SV *oldhook = PL_diehook;
152     PL_diehook = savedie;
153     SvREFCNT_dec (oldhook);
154     }
155    
156     POPSTACK;
157 root 1.1 }
158    
159 root 1.2 errno = old_errno;
160 root 1.1 }
161    
162     static void
163 root 1.2 handle_asyncs (void)
164 root 1.1 {
165 root 1.2 int i;
166    
167 root 1.1 async_pending = 0;
168 root 1.2
169     for (i = AvFILLp (asyncs); i >= 0; --i)
170     {
171 root 1.12 SV *async_sv = AvARRAY (asyncs)[i];
172     async_t *async = SvASYNC_nrv (async_sv);
173 root 1.2
174     if (async->pending && !async->blocked)
175 root 1.12 {
176     /* temporarily keep a refcount */
177     SvREFCNT_inc (async_sv);
178     handle_async (async);
179     SvREFCNT_dec (async_sv);
180    
181     /* the handler could have deleted any number of asyncs */
182     if (i > AvFILLp (asyncs))
183     i = AvFILLp (asyncs);
184     }
185 root 1.2 }
186 root 1.1 }
187    
188 root 1.5 #if HAS_SA_SIGINFO
189 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
190     {
191     if (signum == 9)
192 root 1.2 handle_asyncs ();
193 root 1.1 else
194     old_sighandler (signum, si, sarg);
195     }
196     #else
197 root 1.3 static Signal_t async_sighandler (int signum)
198 root 1.1 {
199     if (signum == 9)
200 root 1.3 handle_asyncs ();
201 root 1.1 else
202     old_sighandler (signum);
203     }
204     #endif
205    
206 root 1.7 #define block(async) ++(async)->blocked
207    
208 root 1.6 static void
209 root 1.7 unblock (async_t *async)
210 root 1.4 {
211     --async->blocked;
212     if (async->pending && !async->blocked)
213     handle_async (async);
214 root 1.7 }
215 root 1.4
216 root 1.7 static void
217     scope_block_cb (pTHX_ void *async_sv)
218     {
219     async_t *async = SvASYNC_nrv ((SV *)async_sv);
220 root 1.13
221     async->scope_savestack = 0;
222 root 1.7 unblock (async);
223 root 1.4 SvREFCNT_dec (async_sv);
224     }
225 root 1.1
226 root 1.13 static void
227     scope_block (SV *async_sv)
228     {
229     async_t *async = SvASYNC_nrv (async_sv);
230    
231     /* as a heuristic, we skip the scope block if we already are blocked */
232     /* and the existing scope block used the same savestack */
233    
234     if (!async->scope_savestack || async->scope_savestack != PL_savestack)
235     {
236     async->scope_savestack = PL_savestack;
237     block (async);
238    
239     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
240     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
241     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
242     }
243     }
244    
245     #endif
246 root 1.1 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
247    
248     BOOT:
249     old_sighandler = PL_sighandlerp;
250     PL_sighandlerp = async_sighandler;
251     sig_pending = &PL_sig_pending;
252     psig_pend = PL_psig_pend;
253     asyncs = newAV ();
254 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
255 root 1.1
256 root 1.3 PROTOTYPES: DISABLE
257    
258 root 1.6 void
259 root 1.9 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
260 root 1.6 PPCODE:
261 root 1.1 {
262 root 1.8 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
263 root 1.6 async_t *async;
264 root 1.1
265 root 1.6 Newz (0, async, 1, async_t);
266 root 1.1
267 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
268 root 1.9 /* TODO: need to bless right now to ensure deallocation */
269 root 1.6 av_push (asyncs, TOPs);
270 root 1.2
271 root 1.9 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
272     if (SvOK (fh_r) || SvOK (fh_w))
273     {
274     int fd_r = s_fileno_croak (fh_r, 0);
275     int fd_w = s_fileno_croak (fh_w, 1);
276    
277     async->fh_r = newSVsv (fh_r);
278     async->fh_w = newSVsv (fh_w);
279     async->ep.fd [0] = fd_r;
280     async->ep.fd [1] = fd_w;
281     async->ep.len = 1;
282     async->fd_enable = 1;
283     }
284    
285     async->value = SvROK (pvalue)
286     ? SvREFCNT_inc_NN (SvRV (pvalue))
287     : NEWSV (0, 0);
288    
289     sv_setiv (async->value, 0);
290     SvIOK_only (async->value); /* just to be sure */
291     SvREADONLY_on (async->value);
292    
293     async->valuep = &(SvIVX (async->value));
294    
295 root 1.11 async->autodrain = 1;
296 root 1.6 async->cb = cv;
297     async->c_cb = c_cb;
298     async->c_arg = c_arg;
299 root 1.8 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
300 root 1.6
301     if (async->signum)
302     {
303     if (async->signum < 0)
304     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
305    
306     sig_async [async->signum] = async;
307 root 1.13 setsig (async->signum, async_sigsend);
308 root 1.6 }
309 root 1.1 }
310    
311     void
312 root 1.13 signal_hysteresis (async_t *async, int enable)
313     CODE:
314     async->hysteresis = enable;
315    
316     void
317 root 1.6 signal_func (async_t *async)
318 root 1.1 PPCODE:
319     EXTEND (SP, 2);
320     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
321 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
322 root 1.1
323 root 1.13 void
324     scope_block_func (SV *self)
325     PPCODE:
326     EXTEND (SP, 2);
327     PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
328     PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
329    
330 root 1.9 IV
331     c_var (async_t *async)
332     CODE:
333     RETVAL = PTR2IV (async->valuep);
334     OUTPUT:
335     RETVAL
336    
337 root 1.1 void
338 root 1.9 signal (async_t *async, int value = 1)
339 root 1.1 CODE:
340 root 1.6 async_signal (async, value);
341 root 1.2
342     void
343 root 1.6 block (async_t *async)
344 root 1.2 CODE:
345 root 1.7 block (async);
346 root 1.2
347     void
348 root 1.6 unblock (async_t *async)
349 root 1.2 CODE:
350 root 1.7 unblock (async);
351 root 1.1
352     void
353 root 1.4 scope_block (SV *self)
354     CODE:
355 root 1.13 scope_block (SvRV (self));
356 root 1.4
357     void
358 root 1.6 pipe_enable (async_t *async)
359     ALIAS:
360     pipe_enable = 1
361     pipe_disable = 0
362     CODE:
363     async->fd_enable = ix;
364    
365 root 1.9 int
366     pipe_fileno (async_t *async)
367     CODE:
368     if (!async->ep.len)
369     {
370     int res;
371    
372     /*block (async);*//*TODO*/
373     res = s_epipe_new (&async->ep);
374     async->fd_enable = 1;
375     /*unblock (async);*//*TODO*/
376    
377     if (res < 0)
378     croak ("Async::Interrupt: unable to initialize event pipe");
379     }
380    
381     RETVAL = async->ep.fd [0];
382     OUTPUT:
383     RETVAL
384    
385 root 1.11 int
386     pipe_autodrain (async_t *async, int enable = -1)
387     CODE:
388     RETVAL = async->autodrain;
389     if (enable >= 0)
390     async->autodrain = enable;
391     OUTPUT:
392     RETVAL
393 root 1.9
394     void
395     post_fork (async_t *async)
396     CODE:
397     if (async->ep.len)
398     {
399     int res;
400    
401     /*block (async);*//*TODO*/
402     res = s_epipe_renew (&async->ep);
403     /*unblock (async);*//*TODO*/
404    
405     if (res < 0)
406     croak ("Async::Interrupt: unable to initialize event pipe after fork");
407     }
408    
409 root 1.6 void
410 root 1.1 DESTROY (SV *self)
411     CODE:
412     {
413     int i;
414     SV *async_sv = SvRV (self);
415 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
416 root 1.1
417 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
418 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
419     {
420 root 1.12 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
421     av_pop (asyncs);
422 root 1.1 goto found;
423     }
424    
425     if (!PL_dirty)
426 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
427 root 1.1
428     found:
429 root 1.6
430     if (async->signum)
431 root 1.13 setsig (async->signum, SIG_DFL);
432 root 1.6
433 root 1.9 if (!async->fh_r && async->ep.len)
434     s_epipe_destroy (&async->ep);
435    
436 root 1.2 SvREFCNT_dec (async->fh_r);
437     SvREFCNT_dec (async->fh_w);
438 root 1.1 SvREFCNT_dec (async->cb);
439 root 1.9 SvREFCNT_dec (async->value);
440 root 1.1
441     Safefree (async);
442     }
443    
444 root 1.11 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
445    
446     void
447     new (const char *klass)
448     PPCODE:
449     {
450     s_epipe *epp;
451     SV *self;
452    
453     Newz (0, epp, 1, s_epipe);
454     XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
455    
456     if (s_epipe_new (epp) < 0)
457     croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
458     }
459    
460     void
461     filenos (s_epipe *epp)
462     PPCODE:
463     EXTEND (SP, 2);
464     PUSHs (sv_2mortal (newSViv (epp->fd [0])));
465     PUSHs (sv_2mortal (newSViv (epp->fd [1])));
466    
467     int
468     fileno (s_epipe *epp)
469     ALIAS:
470     fileno = 0
471     fileno_r = 0
472     fileno_w = 1
473     CODE:
474     RETVAL = epp->fd [ix];
475     OUTPUT:
476     RETVAL
477    
478     int
479     type (s_epipe *epp)
480     CODE:
481     RETVAL = epp->len;
482     OUTPUT:
483     RETVAL
484    
485     void
486     s_epipe_signal (s_epipe *epp)
487    
488     void
489     s_epipe_drain (s_epipe *epp)
490    
491     void
492     DESTROY (s_epipe *epp)
493     CODE:
494     s_epipe_destroy (epp);
495