ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.21
Committed: Tue Apr 17 19:24:32 2018 UTC (6 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_25, rel-1_24
Changes since 1.20: +4 -1 lines
Log Message:
1.24

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.20 #define ECB_NO_LIBM 1
6     #define ECB_NO_THREADS 1
7 root 1.19 #include "ecb.h"
8 root 1.8 #include "schmorp.h"
9    
10 root 1.1 typedef volatile sig_atomic_t atomic_t;
11    
12     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
13     static Sighandler_t old_sighandler;
14     static atomic_t async_pending;
15    
16 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
17     (PERL_REVISION > (a) \
18     || (PERL_REVISION == (a) \
19     && (PERL_VERSION > (b) \
20     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
21    
22     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
23     # define HAS_SA_SIGINFO 1
24     #endif
25    
26     #if !PERL_VERSION_ATLEAST(5,10,0)
27     # undef HAS_SA_SIGINFO
28     #endif
29    
30 root 1.6 /*****************************************************************************/
31 root 1.1
32 root 1.6 typedef struct {
33 root 1.1 SV *cb;
34 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
35     void *c_arg;
36     SV *fh_r, *fh_w;
37 root 1.9 SV *value;
38 root 1.6 int signum;
39 root 1.11 int autodrain;
40 root 1.13 ANY *scope_savestack;
41 root 1.7 volatile int blocked;
42 root 1.1
43 root 1.9 s_epipe ep;
44 root 1.6 int fd_wlen;
45     atomic_t fd_enable;
46 root 1.2 atomic_t pending;
47 root 1.9 volatile IV *valuep;
48 root 1.13 atomic_t hysteresis;
49 root 1.6 } async_t;
50    
51     static AV *asyncs;
52     static async_t *sig_async [SIG_SIZE];
53    
54     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
55     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
56 root 1.1
57 root 1.13 static void async_signal (void *signal_arg, int value);
58    
59     static void
60     setsig (int signum, void (*handler)(int))
61     {
62     #if _WIN32
63 root 1.17 signal (signum, handler);
64 root 1.13 #else
65     struct sigaction sa;
66     sa.sa_handler = handler;
67     sigfillset (&sa.sa_mask);
68     sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
69     sigaction (signum, &sa, 0);
70 root 1.17 #endif
71 root 1.13 }
72    
73     static void
74     async_sigsend (int signum)
75     {
76     async_signal (sig_async [signum], 0);
77     }
78    
79 root 1.1 /* the main workhorse to signal */
80     static void
81     async_signal (void *signal_arg, int value)
82     {
83 root 1.6 static char pipedata [8];
84    
85     async_t *async = (async_t *)signal_arg;
86 root 1.2 int pending = async->pending;
87 root 1.1
88 root 1.13 if (async->hysteresis)
89     setsig (async->signum, SIG_IGN);
90    
91 root 1.9 *async->valuep = value ? value : 1;
92 root 1.19 ECB_MEMORY_FENCE_RELEASE;
93 root 1.2 async->pending = 1;
94 root 1.19 ECB_MEMORY_FENCE_RELEASE;
95 root 1.2 async_pending = 1;
96 root 1.19 ECB_MEMORY_FENCE_RELEASE;
97    
98     if (!async->blocked)
99     {
100     psig_pend [9] = 1;
101     ECB_MEMORY_FENCE_RELEASE;
102     *sig_pending = 1;
103     ECB_MEMORY_FENCE_RELEASE;
104     }
105 root 1.1
106 root 1.10 if (!pending && async->fd_enable && async->ep.len)
107     s_epipe_signal (&async->ep);
108 root 1.2 }
109    
110     static void
111 root 1.6 handle_async (async_t *async)
112 root 1.2 {
113     int old_errno = errno;
114 root 1.9 int value = *async->valuep;
115 root 1.2
116 root 1.9 *async->valuep = 0;
117 root 1.2 async->pending = 0;
118    
119 root 1.13 /* restore signal */
120     if (async->hysteresis)
121     setsig (async->signum, async_sigsend);
122    
123 root 1.2 /* drain pipe */
124 root 1.11 if (async->fd_enable && async->ep.len && async->autodrain)
125 root 1.9 s_epipe_drain (&async->ep);
126 root 1.2
127     if (async->c_cb)
128     {
129     dTHX;
130     async->c_cb (aTHX_ async->c_arg, value);
131     }
132    
133     if (async->cb)
134     {
135     dSP;
136    
137     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
138     SV *savedie = PL_diehook;
139    
140     PL_diehook = 0;
141    
142     PUSHSTACKi (PERLSI_SIGNAL);
143    
144     PUSHMARK (SP);
145     XPUSHs (sv_2mortal (newSViv (value)));
146     PUTBACK;
147     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
148    
149     if (SvTRUE (ERRSV))
150     {
151     SPAGAIN;
152    
153     PUSHMARK (SP);
154     PUTBACK;
155     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
156    
157     sv_setpvn (ERRSV, "", 0);
158     }
159    
160     if (saveerr)
161     sv_setsv (ERRSV, saveerr);
162 root 1.1
163 root 1.2 {
164     SV *oldhook = PL_diehook;
165     PL_diehook = savedie;
166     SvREFCNT_dec (oldhook);
167     }
168    
169     POPSTACK;
170 root 1.1 }
171    
172 root 1.2 errno = old_errno;
173 root 1.1 }
174    
175     static void
176 root 1.2 handle_asyncs (void)
177 root 1.1 {
178 root 1.2 int i;
179    
180 root 1.19 ECB_MEMORY_FENCE_ACQUIRE;
181    
182 root 1.1 async_pending = 0;
183 root 1.2
184     for (i = AvFILLp (asyncs); i >= 0; --i)
185     {
186 root 1.12 SV *async_sv = AvARRAY (asyncs)[i];
187     async_t *async = SvASYNC_nrv (async_sv);
188 root 1.2
189     if (async->pending && !async->blocked)
190 root 1.12 {
191     /* temporarily keep a refcount */
192     SvREFCNT_inc (async_sv);
193     handle_async (async);
194     SvREFCNT_dec (async_sv);
195    
196     /* the handler could have deleted any number of asyncs */
197     if (i > AvFILLp (asyncs))
198     i = AvFILLp (asyncs);
199     }
200 root 1.2 }
201 root 1.1 }
202    
203 root 1.5 #if HAS_SA_SIGINFO
204 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
205     {
206     if (signum == 9)
207 root 1.2 handle_asyncs ();
208 root 1.1 else
209     old_sighandler (signum, si, sarg);
210     }
211     #else
212 root 1.3 static Signal_t async_sighandler (int signum)
213 root 1.1 {
214     if (signum == 9)
215 root 1.3 handle_asyncs ();
216 root 1.1 else
217     old_sighandler (signum);
218     }
219     #endif
220    
221 root 1.7 #define block(async) ++(async)->blocked
222    
223 root 1.6 static void
224 root 1.7 unblock (async_t *async)
225 root 1.4 {
226     --async->blocked;
227     if (async->pending && !async->blocked)
228     handle_async (async);
229 root 1.7 }
230 root 1.4
231 root 1.7 static void
232     scope_block_cb (pTHX_ void *async_sv)
233     {
234     async_t *async = SvASYNC_nrv ((SV *)async_sv);
235 root 1.13
236     async->scope_savestack = 0;
237 root 1.7 unblock (async);
238 root 1.4 SvREFCNT_dec (async_sv);
239     }
240 root 1.1
241 root 1.13 static void
242     scope_block (SV *async_sv)
243     {
244     async_t *async = SvASYNC_nrv (async_sv);
245    
246     /* as a heuristic, we skip the scope block if we already are blocked */
247     /* and the existing scope block used the same savestack */
248    
249     if (!async->scope_savestack || async->scope_savestack != PL_savestack)
250     {
251     async->scope_savestack = PL_savestack;
252     block (async);
253    
254     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
255     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
256     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
257     }
258     }
259    
260 root 1.1 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
261    
262     BOOT:
263     old_sighandler = PL_sighandlerp;
264     PL_sighandlerp = async_sighandler;
265     sig_pending = &PL_sig_pending;
266     psig_pend = PL_psig_pend;
267     asyncs = newAV ();
268 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
269 root 1.1
270 root 1.3 PROTOTYPES: DISABLE
271    
272 root 1.6 void
273 root 1.9 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
274 root 1.6 PPCODE:
275 root 1.1 {
276 root 1.8 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
277 root 1.19 async_t *async;
278 root 1.1
279 root 1.6 Newz (0, async, 1, async_t);
280 root 1.1
281 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
282 root 1.9 /* TODO: need to bless right now to ensure deallocation */
283 root 1.6 av_push (asyncs, TOPs);
284 root 1.2
285 root 1.9 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
286     if (SvOK (fh_r) || SvOK (fh_w))
287     {
288     int fd_r = s_fileno_croak (fh_r, 0);
289     int fd_w = s_fileno_croak (fh_w, 1);
290    
291     async->fh_r = newSVsv (fh_r);
292     async->fh_w = newSVsv (fh_w);
293     async->ep.fd [0] = fd_r;
294     async->ep.fd [1] = fd_w;
295     async->ep.len = 1;
296     async->fd_enable = 1;
297     }
298    
299     async->value = SvROK (pvalue)
300     ? SvREFCNT_inc_NN (SvRV (pvalue))
301     : NEWSV (0, 0);
302    
303     sv_setiv (async->value, 0);
304     SvIOK_only (async->value); /* just to be sure */
305     SvREADONLY_on (async->value);
306    
307     async->valuep = &(SvIVX (async->value));
308    
309 root 1.11 async->autodrain = 1;
310 root 1.6 async->cb = cv;
311     async->c_cb = c_cb;
312     async->c_arg = c_arg;
313 root 1.8 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
314 root 1.6
315     if (async->signum)
316     {
317     if (async->signum < 0)
318     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
319    
320     sig_async [async->signum] = async;
321 root 1.13 setsig (async->signum, async_sigsend);
322 root 1.6 }
323 root 1.1 }
324    
325     void
326 root 1.13 signal_hysteresis (async_t *async, int enable)
327     CODE:
328     async->hysteresis = enable;
329    
330     void
331 root 1.6 signal_func (async_t *async)
332 root 1.1 PPCODE:
333     EXTEND (SP, 2);
334     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
335 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
336 root 1.1
337 root 1.13 void
338     scope_block_func (SV *self)
339     PPCODE:
340     EXTEND (SP, 2);
341     PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
342     PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
343    
344 root 1.9 IV
345     c_var (async_t *async)
346     CODE:
347     RETVAL = PTR2IV (async->valuep);
348     OUTPUT:
349     RETVAL
350    
351 root 1.1 void
352 root 1.19 handle (async_t *async)
353     CODE:
354     handle_async (async);
355    
356     void
357 root 1.9 signal (async_t *async, int value = 1)
358 root 1.1 CODE:
359 root 1.6 async_signal (async, value);
360 root 1.2
361     void
362 root 1.6 block (async_t *async)
363 root 1.2 CODE:
364 root 1.7 block (async);
365 root 1.2
366     void
367 root 1.6 unblock (async_t *async)
368 root 1.2 CODE:
369 root 1.7 unblock (async);
370 root 1.1
371     void
372 root 1.4 scope_block (SV *self)
373     CODE:
374 root 1.13 scope_block (SvRV (self));
375 root 1.4
376     void
377 root 1.6 pipe_enable (async_t *async)
378     ALIAS:
379 root 1.19 pipe_enable = 1
380 root 1.6 pipe_disable = 0
381     CODE:
382     async->fd_enable = ix;
383    
384 root 1.9 int
385     pipe_fileno (async_t *async)
386     CODE:
387     if (!async->ep.len)
388     {
389     int res;
390    
391     /*block (async);*//*TODO*/
392     res = s_epipe_new (&async->ep);
393     async->fd_enable = 1;
394     /*unblock (async);*//*TODO*/
395    
396     if (res < 0)
397     croak ("Async::Interrupt: unable to initialize event pipe");
398     }
399    
400     RETVAL = async->ep.fd [0];
401     OUTPUT:
402     RETVAL
403    
404 root 1.11 int
405     pipe_autodrain (async_t *async, int enable = -1)
406     CODE:
407     RETVAL = async->autodrain;
408     if (enable >= 0)
409     async->autodrain = enable;
410     OUTPUT:
411     RETVAL
412 root 1.9
413     void
414 root 1.19 pipe_drain (async_t *async)
415     CODE:
416     if (async->ep.len)
417     s_epipe_drain (&async->ep);
418    
419     void
420 root 1.9 post_fork (async_t *async)
421     CODE:
422     if (async->ep.len)
423     {
424 root 1.19 int res;
425 root 1.9
426     /*block (async);*//*TODO*/
427     res = s_epipe_renew (&async->ep);
428     /*unblock (async);*//*TODO*/
429    
430     if (res < 0)
431     croak ("Async::Interrupt: unable to initialize event pipe after fork");
432     }
433    
434 root 1.6 void
435 root 1.1 DESTROY (SV *self)
436     CODE:
437     {
438 root 1.19 int i;
439     SV *async_sv = SvRV (self);
440     async_t *async = SvASYNC_nrv (async_sv);
441 root 1.1
442 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
443 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
444     {
445 root 1.12 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
446     av_pop (asyncs);
447 root 1.1 goto found;
448     }
449    
450     if (!PL_dirty)
451 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
452 root 1.1
453     found:
454 root 1.6
455     if (async->signum)
456 root 1.13 setsig (async->signum, SIG_DFL);
457 root 1.6
458 root 1.9 if (!async->fh_r && async->ep.len)
459     s_epipe_destroy (&async->ep);
460    
461 root 1.2 SvREFCNT_dec (async->fh_r);
462     SvREFCNT_dec (async->fh_w);
463 root 1.1 SvREFCNT_dec (async->cb);
464 root 1.9 SvREFCNT_dec (async->value);
465 root 1.1
466     Safefree (async);
467     }
468    
469 root 1.14 SV *
470     sig2num (SV *signame_or_number)
471     ALIAS:
472     sig2num = 0
473     sig2name = 1
474 root 1.16 PROTOTYPE: $
475 root 1.14 CODE:
476     {
477     int signum = s_signum (signame_or_number);
478    
479     if (signum < 0)
480     RETVAL = &PL_sv_undef;
481     else if (ix)
482     RETVAL = newSVpv (PL_sig_name [signum], 0);
483     else
484     RETVAL = newSViv (signum);
485     }
486     OUTPUT:
487     RETVAL
488    
489 root 1.11 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
490    
491     void
492     new (const char *klass)
493     PPCODE:
494     {
495 root 1.21 s_epipe *epp;
496 root 1.11
497     Newz (0, epp, 1, s_epipe);
498     XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
499    
500     if (s_epipe_new (epp) < 0)
501     croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
502     }
503    
504     void
505     filenos (s_epipe *epp)
506     PPCODE:
507     EXTEND (SP, 2);
508     PUSHs (sv_2mortal (newSViv (epp->fd [0])));
509     PUSHs (sv_2mortal (newSViv (epp->fd [1])));
510    
511     int
512     fileno (s_epipe *epp)
513     ALIAS:
514     fileno = 0
515     fileno_r = 0
516     fileno_w = 1
517     CODE:
518     RETVAL = epp->fd [ix];
519     OUTPUT:
520     RETVAL
521    
522     int
523     type (s_epipe *epp)
524     CODE:
525     RETVAL = epp->len;
526     OUTPUT:
527     RETVAL
528    
529     void
530     s_epipe_signal (s_epipe *epp)
531    
532     void
533     s_epipe_drain (s_epipe *epp)
534    
535     void
536 root 1.18 signal_func (s_epipe *epp)
537     ALIAS:
538     drain_func = 1
539 root 1.15 PPCODE:
540     EXTEND (SP, 2);
541 root 1.18 PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
542 root 1.15 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
543    
544     void
545     s_epipe_wait (s_epipe *epp)
546    
547     void
548 root 1.21 s_epipe_renew (s_epipe *epp)
549    
550     void
551 root 1.11 DESTROY (s_epipe *epp)
552     CODE:
553     s_epipe_destroy (epp);
554