ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.19
Committed: Tue Apr 24 22:01:59 2012 UTC (12 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.18: +31 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.19 #include "ecb.h"
6 root 1.8 #include "schmorp.h"
7    
8 root 1.1 typedef volatile sig_atomic_t atomic_t;
9    
10     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
11     static Sighandler_t old_sighandler;
12     static atomic_t async_pending;
13    
14 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
15     (PERL_REVISION > (a) \
16     || (PERL_REVISION == (a) \
17     && (PERL_VERSION > (b) \
18     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
19    
20     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
21     # define HAS_SA_SIGINFO 1
22     #endif
23    
24     #if !PERL_VERSION_ATLEAST(5,10,0)
25     # undef HAS_SA_SIGINFO
26     #endif
27    
28 root 1.6 /*****************************************************************************/
29 root 1.1
30 root 1.6 typedef struct {
31 root 1.1 SV *cb;
32 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
33     void *c_arg;
34     SV *fh_r, *fh_w;
35 root 1.9 SV *value;
36 root 1.6 int signum;
37 root 1.11 int autodrain;
38 root 1.13 ANY *scope_savestack;
39 root 1.7 volatile int blocked;
40 root 1.1
41 root 1.9 s_epipe ep;
42 root 1.6 int fd_wlen;
43     atomic_t fd_enable;
44 root 1.2 atomic_t pending;
45 root 1.9 volatile IV *valuep;
46 root 1.13 atomic_t hysteresis;
47 root 1.6 } async_t;
48    
49     static AV *asyncs;
50     static async_t *sig_async [SIG_SIZE];
51    
52     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
53     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
54 root 1.1
55 root 1.13 static void async_signal (void *signal_arg, int value);
56    
57     static void
58     setsig (int signum, void (*handler)(int))
59     {
60     #if _WIN32
61 root 1.17 signal (signum, handler);
62 root 1.13 #else
63     struct sigaction sa;
64     sa.sa_handler = handler;
65     sigfillset (&sa.sa_mask);
66     sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
67     sigaction (signum, &sa, 0);
68 root 1.17 #endif
69 root 1.13 }
70    
71     static void
72     async_sigsend (int signum)
73     {
74     async_signal (sig_async [signum], 0);
75     }
76    
77 root 1.1 /* the main workhorse to signal */
78     static void
79     async_signal (void *signal_arg, int value)
80     {
81 root 1.6 static char pipedata [8];
82    
83     async_t *async = (async_t *)signal_arg;
84 root 1.2 int pending = async->pending;
85 root 1.1
86 root 1.13 if (async->hysteresis)
87     setsig (async->signum, SIG_IGN);
88    
89 root 1.9 *async->valuep = value ? value : 1;
90 root 1.19 ECB_MEMORY_FENCE_RELEASE;
91 root 1.2 async->pending = 1;
92 root 1.19 ECB_MEMORY_FENCE_RELEASE;
93 root 1.2 async_pending = 1;
94 root 1.19 ECB_MEMORY_FENCE_RELEASE;
95    
96     if (!async->blocked)
97     {
98     psig_pend [9] = 1;
99     ECB_MEMORY_FENCE_RELEASE;
100     *sig_pending = 1;
101     ECB_MEMORY_FENCE_RELEASE;
102     }
103 root 1.1
104 root 1.10 if (!pending && async->fd_enable && async->ep.len)
105     s_epipe_signal (&async->ep);
106 root 1.2 }
107    
108     static void
109 root 1.6 handle_async (async_t *async)
110 root 1.2 {
111     int old_errno = errno;
112 root 1.9 int value = *async->valuep;
113 root 1.2
114 root 1.9 *async->valuep = 0;
115 root 1.2 async->pending = 0;
116    
117 root 1.13 /* restore signal */
118     if (async->hysteresis)
119     setsig (async->signum, async_sigsend);
120    
121 root 1.2 /* drain pipe */
122 root 1.11 if (async->fd_enable && async->ep.len && async->autodrain)
123 root 1.9 s_epipe_drain (&async->ep);
124 root 1.2
125     if (async->c_cb)
126     {
127     dTHX;
128     async->c_cb (aTHX_ async->c_arg, value);
129     }
130    
131     if (async->cb)
132     {
133     dSP;
134    
135     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
136     SV *savedie = PL_diehook;
137    
138     PL_diehook = 0;
139    
140     PUSHSTACKi (PERLSI_SIGNAL);
141    
142     PUSHMARK (SP);
143     XPUSHs (sv_2mortal (newSViv (value)));
144     PUTBACK;
145     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
146    
147     if (SvTRUE (ERRSV))
148     {
149     SPAGAIN;
150    
151     PUSHMARK (SP);
152     PUTBACK;
153     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
154    
155     sv_setpvn (ERRSV, "", 0);
156     }
157    
158     if (saveerr)
159     sv_setsv (ERRSV, saveerr);
160 root 1.1
161 root 1.2 {
162     SV *oldhook = PL_diehook;
163     PL_diehook = savedie;
164     SvREFCNT_dec (oldhook);
165     }
166    
167     POPSTACK;
168 root 1.1 }
169    
170 root 1.2 errno = old_errno;
171 root 1.1 }
172    
173     static void
174 root 1.2 handle_asyncs (void)
175 root 1.1 {
176 root 1.2 int i;
177    
178 root 1.19 ECB_MEMORY_FENCE_ACQUIRE;
179    
180 root 1.1 async_pending = 0;
181 root 1.2
182     for (i = AvFILLp (asyncs); i >= 0; --i)
183     {
184 root 1.12 SV *async_sv = AvARRAY (asyncs)[i];
185     async_t *async = SvASYNC_nrv (async_sv);
186 root 1.2
187     if (async->pending && !async->blocked)
188 root 1.12 {
189     /* temporarily keep a refcount */
190     SvREFCNT_inc (async_sv);
191     handle_async (async);
192     SvREFCNT_dec (async_sv);
193    
194     /* the handler could have deleted any number of asyncs */
195     if (i > AvFILLp (asyncs))
196     i = AvFILLp (asyncs);
197     }
198 root 1.2 }
199 root 1.1 }
200    
201 root 1.5 #if HAS_SA_SIGINFO
202 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
203     {
204     if (signum == 9)
205 root 1.2 handle_asyncs ();
206 root 1.1 else
207     old_sighandler (signum, si, sarg);
208     }
209     #else
210 root 1.3 static Signal_t async_sighandler (int signum)
211 root 1.1 {
212     if (signum == 9)
213 root 1.3 handle_asyncs ();
214 root 1.1 else
215     old_sighandler (signum);
216     }
217     #endif
218    
219 root 1.7 #define block(async) ++(async)->blocked
220    
221 root 1.6 static void
222 root 1.7 unblock (async_t *async)
223 root 1.4 {
224     --async->blocked;
225     if (async->pending && !async->blocked)
226     handle_async (async);
227 root 1.7 }
228 root 1.4
229 root 1.7 static void
230     scope_block_cb (pTHX_ void *async_sv)
231     {
232     async_t *async = SvASYNC_nrv ((SV *)async_sv);
233 root 1.13
234     async->scope_savestack = 0;
235 root 1.7 unblock (async);
236 root 1.4 SvREFCNT_dec (async_sv);
237     }
238 root 1.1
239 root 1.13 static void
240     scope_block (SV *async_sv)
241     {
242     async_t *async = SvASYNC_nrv (async_sv);
243    
244     /* as a heuristic, we skip the scope block if we already are blocked */
245     /* and the existing scope block used the same savestack */
246    
247     if (!async->scope_savestack || async->scope_savestack != PL_savestack)
248     {
249     async->scope_savestack = PL_savestack;
250     block (async);
251    
252     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
253     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
254     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
255     }
256     }
257    
258 root 1.1 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
259    
260     BOOT:
261     old_sighandler = PL_sighandlerp;
262     PL_sighandlerp = async_sighandler;
263     sig_pending = &PL_sig_pending;
264     psig_pend = PL_psig_pend;
265     asyncs = newAV ();
266 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
267 root 1.1
268 root 1.3 PROTOTYPES: DISABLE
269    
270 root 1.6 void
271 root 1.9 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
272 root 1.6 PPCODE:
273 root 1.1 {
274 root 1.8 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
275 root 1.19 async_t *async;
276 root 1.1
277 root 1.6 Newz (0, async, 1, async_t);
278 root 1.1
279 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
280 root 1.9 /* TODO: need to bless right now to ensure deallocation */
281 root 1.6 av_push (asyncs, TOPs);
282 root 1.2
283 root 1.9 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
284     if (SvOK (fh_r) || SvOK (fh_w))
285     {
286     int fd_r = s_fileno_croak (fh_r, 0);
287     int fd_w = s_fileno_croak (fh_w, 1);
288    
289     async->fh_r = newSVsv (fh_r);
290     async->fh_w = newSVsv (fh_w);
291     async->ep.fd [0] = fd_r;
292     async->ep.fd [1] = fd_w;
293     async->ep.len = 1;
294     async->fd_enable = 1;
295     }
296    
297     async->value = SvROK (pvalue)
298     ? SvREFCNT_inc_NN (SvRV (pvalue))
299     : NEWSV (0, 0);
300    
301     sv_setiv (async->value, 0);
302     SvIOK_only (async->value); /* just to be sure */
303     SvREADONLY_on (async->value);
304    
305     async->valuep = &(SvIVX (async->value));
306    
307 root 1.11 async->autodrain = 1;
308 root 1.6 async->cb = cv;
309     async->c_cb = c_cb;
310     async->c_arg = c_arg;
311 root 1.8 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
312 root 1.6
313     if (async->signum)
314     {
315     if (async->signum < 0)
316     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
317    
318     sig_async [async->signum] = async;
319 root 1.13 setsig (async->signum, async_sigsend);
320 root 1.6 }
321 root 1.1 }
322    
323     void
324 root 1.13 signal_hysteresis (async_t *async, int enable)
325     CODE:
326     async->hysteresis = enable;
327    
328     void
329 root 1.6 signal_func (async_t *async)
330 root 1.1 PPCODE:
331     EXTEND (SP, 2);
332     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
333 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
334 root 1.1
335 root 1.13 void
336     scope_block_func (SV *self)
337     PPCODE:
338     EXTEND (SP, 2);
339     PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
340     PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
341    
342 root 1.9 IV
343     c_var (async_t *async)
344     CODE:
345     RETVAL = PTR2IV (async->valuep);
346     OUTPUT:
347     RETVAL
348    
349 root 1.1 void
350 root 1.19 handle (async_t *async)
351     CODE:
352     handle_async (async);
353    
354     void
355 root 1.9 signal (async_t *async, int value = 1)
356 root 1.1 CODE:
357 root 1.6 async_signal (async, value);
358 root 1.2
359     void
360 root 1.6 block (async_t *async)
361 root 1.2 CODE:
362 root 1.7 block (async);
363 root 1.2
364     void
365 root 1.6 unblock (async_t *async)
366 root 1.2 CODE:
367 root 1.7 unblock (async);
368 root 1.1
369     void
370 root 1.4 scope_block (SV *self)
371     CODE:
372 root 1.13 scope_block (SvRV (self));
373 root 1.4
374     void
375 root 1.6 pipe_enable (async_t *async)
376     ALIAS:
377 root 1.19 pipe_enable = 1
378 root 1.6 pipe_disable = 0
379     CODE:
380     async->fd_enable = ix;
381    
382 root 1.9 int
383     pipe_fileno (async_t *async)
384     CODE:
385     if (!async->ep.len)
386     {
387     int res;
388    
389     /*block (async);*//*TODO*/
390     res = s_epipe_new (&async->ep);
391     async->fd_enable = 1;
392     /*unblock (async);*//*TODO*/
393    
394     if (res < 0)
395     croak ("Async::Interrupt: unable to initialize event pipe");
396     }
397    
398     RETVAL = async->ep.fd [0];
399     OUTPUT:
400     RETVAL
401    
402 root 1.11 int
403     pipe_autodrain (async_t *async, int enable = -1)
404     CODE:
405     RETVAL = async->autodrain;
406     if (enable >= 0)
407     async->autodrain = enable;
408     OUTPUT:
409     RETVAL
410 root 1.9
411     void
412 root 1.19 pipe_drain (async_t *async)
413     CODE:
414     if (async->ep.len)
415     s_epipe_drain (&async->ep);
416    
417     void
418 root 1.9 post_fork (async_t *async)
419     CODE:
420     if (async->ep.len)
421     {
422 root 1.19 int res;
423 root 1.9
424     /*block (async);*//*TODO*/
425     res = s_epipe_renew (&async->ep);
426     /*unblock (async);*//*TODO*/
427    
428     if (res < 0)
429     croak ("Async::Interrupt: unable to initialize event pipe after fork");
430     }
431    
432 root 1.6 void
433 root 1.1 DESTROY (SV *self)
434     CODE:
435     {
436 root 1.19 int i;
437     SV *async_sv = SvRV (self);
438     async_t *async = SvASYNC_nrv (async_sv);
439 root 1.1
440 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
441 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
442     {
443 root 1.12 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
444     av_pop (asyncs);
445 root 1.1 goto found;
446     }
447    
448     if (!PL_dirty)
449 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
450 root 1.1
451     found:
452 root 1.6
453     if (async->signum)
454 root 1.13 setsig (async->signum, SIG_DFL);
455 root 1.6
456 root 1.9 if (!async->fh_r && async->ep.len)
457     s_epipe_destroy (&async->ep);
458    
459 root 1.2 SvREFCNT_dec (async->fh_r);
460     SvREFCNT_dec (async->fh_w);
461 root 1.1 SvREFCNT_dec (async->cb);
462 root 1.9 SvREFCNT_dec (async->value);
463 root 1.1
464     Safefree (async);
465     }
466    
467 root 1.14 SV *
468     sig2num (SV *signame_or_number)
469     ALIAS:
470     sig2num = 0
471     sig2name = 1
472 root 1.16 PROTOTYPE: $
473 root 1.14 CODE:
474     {
475     int signum = s_signum (signame_or_number);
476    
477     if (signum < 0)
478     RETVAL = &PL_sv_undef;
479     else if (ix)
480     RETVAL = newSVpv (PL_sig_name [signum], 0);
481     else
482     RETVAL = newSViv (signum);
483     }
484     OUTPUT:
485     RETVAL
486    
487 root 1.11 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
488    
489     void
490     new (const char *klass)
491     PPCODE:
492     {
493     s_epipe *epp;
494    
495     Newz (0, epp, 1, s_epipe);
496     XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
497    
498     if (s_epipe_new (epp) < 0)
499     croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
500     }
501    
502     void
503     filenos (s_epipe *epp)
504     PPCODE:
505     EXTEND (SP, 2);
506     PUSHs (sv_2mortal (newSViv (epp->fd [0])));
507     PUSHs (sv_2mortal (newSViv (epp->fd [1])));
508    
509     int
510     fileno (s_epipe *epp)
511     ALIAS:
512     fileno = 0
513     fileno_r = 0
514     fileno_w = 1
515     CODE:
516     RETVAL = epp->fd [ix];
517     OUTPUT:
518     RETVAL
519    
520     int
521     type (s_epipe *epp)
522     CODE:
523     RETVAL = epp->len;
524     OUTPUT:
525     RETVAL
526    
527     void
528     s_epipe_signal (s_epipe *epp)
529    
530     void
531     s_epipe_drain (s_epipe *epp)
532    
533     void
534 root 1.18 signal_func (s_epipe *epp)
535     ALIAS:
536     drain_func = 1
537 root 1.15 PPCODE:
538     EXTEND (SP, 2);
539 root 1.18 PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
540 root 1.15 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
541    
542     void
543     s_epipe_wait (s_epipe *epp)
544    
545     void
546 root 1.11 DESTROY (s_epipe *epp)
547     CODE:
548     s_epipe_destroy (epp);
549