ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.18
Committed: Sat May 15 00:08:48 2010 UTC (14 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_05
Changes since 1.17: +4 -2 lines
Log Message:
1.05

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.8 #include "schmorp.h"
6    
7 root 1.1 typedef volatile sig_atomic_t atomic_t;
8    
9     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
10     static Sighandler_t old_sighandler;
11     static atomic_t async_pending;
12    
13 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
14     (PERL_REVISION > (a) \
15     || (PERL_REVISION == (a) \
16     && (PERL_VERSION > (b) \
17     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
18    
19     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
20     # define HAS_SA_SIGINFO 1
21     #endif
22    
23     #if !PERL_VERSION_ATLEAST(5,10,0)
24     # undef HAS_SA_SIGINFO
25     #endif
26    
27 root 1.6 /*****************************************************************************/
28 root 1.1
29 root 1.6 typedef struct {
30 root 1.1 SV *cb;
31 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
32     void *c_arg;
33     SV *fh_r, *fh_w;
34 root 1.9 SV *value;
35 root 1.6 int signum;
36 root 1.11 int autodrain;
37 root 1.13 ANY *scope_savestack;
38 root 1.7 volatile int blocked;
39 root 1.1
40 root 1.9 s_epipe ep;
41 root 1.6 int fd_wlen;
42     atomic_t fd_enable;
43 root 1.2 atomic_t pending;
44 root 1.9 volatile IV *valuep;
45 root 1.13 atomic_t hysteresis;
46 root 1.6 } async_t;
47    
48     static AV *asyncs;
49     static async_t *sig_async [SIG_SIZE];
50    
51     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
52     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
53 root 1.1
54 root 1.13 static void async_signal (void *signal_arg, int value);
55    
56     static void
57     setsig (int signum, void (*handler)(int))
58     {
59     #if _WIN32
60 root 1.17 signal (signum, handler);
61 root 1.13 #else
62     struct sigaction sa;
63     sa.sa_handler = handler;
64     sigfillset (&sa.sa_mask);
65     sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
66     sigaction (signum, &sa, 0);
67 root 1.17 #endif
68 root 1.13 }
69    
70     static void
71     async_sigsend (int signum)
72     {
73     async_signal (sig_async [signum], 0);
74     }
75    
76 root 1.1 /* the main workhorse to signal */
77     static void
78     async_signal (void *signal_arg, int value)
79     {
80 root 1.6 static char pipedata [8];
81    
82     async_t *async = (async_t *)signal_arg;
83 root 1.2 int pending = async->pending;
84 root 1.1
85 root 1.13 if (async->hysteresis)
86     setsig (async->signum, SIG_IGN);
87    
88 root 1.9 *async->valuep = value ? value : 1;
89 root 1.2 async->pending = 1;
90     async_pending = 1;
91     psig_pend [9] = 1;
92     *sig_pending = 1;
93 root 1.1
94 root 1.10 if (!pending && async->fd_enable && async->ep.len)
95     s_epipe_signal (&async->ep);
96 root 1.2 }
97    
98     static void
99 root 1.6 handle_async (async_t *async)
100 root 1.2 {
101     int old_errno = errno;
102 root 1.9 int value = *async->valuep;
103 root 1.2
104 root 1.9 *async->valuep = 0;
105 root 1.2 async->pending = 0;
106    
107 root 1.13 /* restore signal */
108     if (async->hysteresis)
109     setsig (async->signum, async_sigsend);
110    
111 root 1.2 /* drain pipe */
112 root 1.11 if (async->fd_enable && async->ep.len && async->autodrain)
113 root 1.9 s_epipe_drain (&async->ep);
114 root 1.2
115     if (async->c_cb)
116     {
117     dTHX;
118     async->c_cb (aTHX_ async->c_arg, value);
119     }
120    
121     if (async->cb)
122     {
123     dSP;
124    
125     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
126     SV *savedie = PL_diehook;
127    
128     PL_diehook = 0;
129    
130     PUSHSTACKi (PERLSI_SIGNAL);
131    
132     PUSHMARK (SP);
133     XPUSHs (sv_2mortal (newSViv (value)));
134     PUTBACK;
135     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
136    
137     if (SvTRUE (ERRSV))
138     {
139     SPAGAIN;
140    
141     PUSHMARK (SP);
142     PUTBACK;
143     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
144    
145     sv_setpvn (ERRSV, "", 0);
146     }
147    
148     if (saveerr)
149     sv_setsv (ERRSV, saveerr);
150 root 1.1
151 root 1.2 {
152     SV *oldhook = PL_diehook;
153     PL_diehook = savedie;
154     SvREFCNT_dec (oldhook);
155     }
156    
157     POPSTACK;
158 root 1.1 }
159    
160 root 1.2 errno = old_errno;
161 root 1.1 }
162    
163     static void
164 root 1.2 handle_asyncs (void)
165 root 1.1 {
166 root 1.2 int i;
167    
168 root 1.1 async_pending = 0;
169 root 1.2
170     for (i = AvFILLp (asyncs); i >= 0; --i)
171     {
172 root 1.12 SV *async_sv = AvARRAY (asyncs)[i];
173     async_t *async = SvASYNC_nrv (async_sv);
174 root 1.2
175     if (async->pending && !async->blocked)
176 root 1.12 {
177     /* temporarily keep a refcount */
178     SvREFCNT_inc (async_sv);
179     handle_async (async);
180     SvREFCNT_dec (async_sv);
181    
182     /* the handler could have deleted any number of asyncs */
183     if (i > AvFILLp (asyncs))
184     i = AvFILLp (asyncs);
185     }
186 root 1.2 }
187 root 1.1 }
188    
189 root 1.5 #if HAS_SA_SIGINFO
190 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
191     {
192     if (signum == 9)
193 root 1.2 handle_asyncs ();
194 root 1.1 else
195     old_sighandler (signum, si, sarg);
196     }
197     #else
198 root 1.3 static Signal_t async_sighandler (int signum)
199 root 1.1 {
200     if (signum == 9)
201 root 1.3 handle_asyncs ();
202 root 1.1 else
203     old_sighandler (signum);
204     }
205     #endif
206    
207 root 1.7 #define block(async) ++(async)->blocked
208    
209 root 1.6 static void
210 root 1.7 unblock (async_t *async)
211 root 1.4 {
212     --async->blocked;
213     if (async->pending && !async->blocked)
214     handle_async (async);
215 root 1.7 }
216 root 1.4
217 root 1.7 static void
218     scope_block_cb (pTHX_ void *async_sv)
219     {
220     async_t *async = SvASYNC_nrv ((SV *)async_sv);
221 root 1.13
222     async->scope_savestack = 0;
223 root 1.7 unblock (async);
224 root 1.4 SvREFCNT_dec (async_sv);
225     }
226 root 1.1
227 root 1.13 static void
228     scope_block (SV *async_sv)
229     {
230     async_t *async = SvASYNC_nrv (async_sv);
231    
232     /* as a heuristic, we skip the scope block if we already are blocked */
233     /* and the existing scope block used the same savestack */
234    
235     if (!async->scope_savestack || async->scope_savestack != PL_savestack)
236     {
237     async->scope_savestack = PL_savestack;
238     block (async);
239    
240     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
241     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
242     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
243     }
244     }
245    
246 root 1.1 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
247    
248     BOOT:
249     old_sighandler = PL_sighandlerp;
250     PL_sighandlerp = async_sighandler;
251     sig_pending = &PL_sig_pending;
252     psig_pend = PL_psig_pend;
253     asyncs = newAV ();
254 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
255 root 1.1
256 root 1.3 PROTOTYPES: DISABLE
257    
258 root 1.6 void
259 root 1.9 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
260 root 1.6 PPCODE:
261 root 1.1 {
262 root 1.8 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
263 root 1.6 async_t *async;
264 root 1.1
265 root 1.6 Newz (0, async, 1, async_t);
266 root 1.1
267 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
268 root 1.9 /* TODO: need to bless right now to ensure deallocation */
269 root 1.6 av_push (asyncs, TOPs);
270 root 1.2
271 root 1.9 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
272     if (SvOK (fh_r) || SvOK (fh_w))
273     {
274     int fd_r = s_fileno_croak (fh_r, 0);
275     int fd_w = s_fileno_croak (fh_w, 1);
276    
277     async->fh_r = newSVsv (fh_r);
278     async->fh_w = newSVsv (fh_w);
279     async->ep.fd [0] = fd_r;
280     async->ep.fd [1] = fd_w;
281     async->ep.len = 1;
282     async->fd_enable = 1;
283     }
284    
285     async->value = SvROK (pvalue)
286     ? SvREFCNT_inc_NN (SvRV (pvalue))
287     : NEWSV (0, 0);
288    
289     sv_setiv (async->value, 0);
290     SvIOK_only (async->value); /* just to be sure */
291     SvREADONLY_on (async->value);
292    
293     async->valuep = &(SvIVX (async->value));
294    
295 root 1.11 async->autodrain = 1;
296 root 1.6 async->cb = cv;
297     async->c_cb = c_cb;
298     async->c_arg = c_arg;
299 root 1.8 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
300 root 1.6
301     if (async->signum)
302     {
303     if (async->signum < 0)
304     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
305    
306     sig_async [async->signum] = async;
307 root 1.13 setsig (async->signum, async_sigsend);
308 root 1.6 }
309 root 1.1 }
310    
311     void
312 root 1.13 signal_hysteresis (async_t *async, int enable)
313     CODE:
314     async->hysteresis = enable;
315    
316     void
317 root 1.6 signal_func (async_t *async)
318 root 1.1 PPCODE:
319     EXTEND (SP, 2);
320     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
321 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
322 root 1.1
323 root 1.13 void
324     scope_block_func (SV *self)
325     PPCODE:
326     EXTEND (SP, 2);
327     PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
328     PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
329    
330 root 1.9 IV
331     c_var (async_t *async)
332     CODE:
333     RETVAL = PTR2IV (async->valuep);
334     OUTPUT:
335     RETVAL
336    
337 root 1.1 void
338 root 1.9 signal (async_t *async, int value = 1)
339 root 1.1 CODE:
340 root 1.6 async_signal (async, value);
341 root 1.2
342     void
343 root 1.6 block (async_t *async)
344 root 1.2 CODE:
345 root 1.7 block (async);
346 root 1.2
347     void
348 root 1.6 unblock (async_t *async)
349 root 1.2 CODE:
350 root 1.7 unblock (async);
351 root 1.1
352     void
353 root 1.4 scope_block (SV *self)
354     CODE:
355 root 1.13 scope_block (SvRV (self));
356 root 1.4
357     void
358 root 1.6 pipe_enable (async_t *async)
359     ALIAS:
360     pipe_enable = 1
361     pipe_disable = 0
362     CODE:
363     async->fd_enable = ix;
364    
365 root 1.9 int
366     pipe_fileno (async_t *async)
367     CODE:
368     if (!async->ep.len)
369     {
370     int res;
371    
372     /*block (async);*//*TODO*/
373     res = s_epipe_new (&async->ep);
374     async->fd_enable = 1;
375     /*unblock (async);*//*TODO*/
376    
377     if (res < 0)
378     croak ("Async::Interrupt: unable to initialize event pipe");
379     }
380    
381     RETVAL = async->ep.fd [0];
382     OUTPUT:
383     RETVAL
384    
385 root 1.11 int
386     pipe_autodrain (async_t *async, int enable = -1)
387     CODE:
388     RETVAL = async->autodrain;
389     if (enable >= 0)
390     async->autodrain = enable;
391     OUTPUT:
392     RETVAL
393 root 1.9
394     void
395     post_fork (async_t *async)
396     CODE:
397     if (async->ep.len)
398     {
399     int res;
400    
401     /*block (async);*//*TODO*/
402     res = s_epipe_renew (&async->ep);
403     /*unblock (async);*//*TODO*/
404    
405     if (res < 0)
406     croak ("Async::Interrupt: unable to initialize event pipe after fork");
407     }
408    
409 root 1.6 void
410 root 1.1 DESTROY (SV *self)
411     CODE:
412     {
413     int i;
414     SV *async_sv = SvRV (self);
415 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
416 root 1.1
417 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
418 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
419     {
420 root 1.12 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
421     av_pop (asyncs);
422 root 1.1 goto found;
423     }
424    
425     if (!PL_dirty)
426 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
427 root 1.1
428     found:
429 root 1.6
430     if (async->signum)
431 root 1.13 setsig (async->signum, SIG_DFL);
432 root 1.6
433 root 1.9 if (!async->fh_r && async->ep.len)
434     s_epipe_destroy (&async->ep);
435    
436 root 1.2 SvREFCNT_dec (async->fh_r);
437     SvREFCNT_dec (async->fh_w);
438 root 1.1 SvREFCNT_dec (async->cb);
439 root 1.9 SvREFCNT_dec (async->value);
440 root 1.1
441     Safefree (async);
442     }
443    
444 root 1.14 SV *
445     sig2num (SV *signame_or_number)
446     ALIAS:
447     sig2num = 0
448     sig2name = 1
449 root 1.16 PROTOTYPE: $
450 root 1.14 CODE:
451     {
452     int signum = s_signum (signame_or_number);
453    
454     if (signum < 0)
455     RETVAL = &PL_sv_undef;
456     else if (ix)
457     RETVAL = newSVpv (PL_sig_name [signum], 0);
458     else
459     RETVAL = newSViv (signum);
460     }
461     OUTPUT:
462     RETVAL
463    
464 root 1.11 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
465    
466     void
467     new (const char *klass)
468     PPCODE:
469     {
470     s_epipe *epp;
471    
472     Newz (0, epp, 1, s_epipe);
473     XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
474    
475     if (s_epipe_new (epp) < 0)
476     croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
477     }
478    
479     void
480     filenos (s_epipe *epp)
481     PPCODE:
482     EXTEND (SP, 2);
483     PUSHs (sv_2mortal (newSViv (epp->fd [0])));
484     PUSHs (sv_2mortal (newSViv (epp->fd [1])));
485    
486     int
487     fileno (s_epipe *epp)
488     ALIAS:
489     fileno = 0
490     fileno_r = 0
491     fileno_w = 1
492     CODE:
493     RETVAL = epp->fd [ix];
494     OUTPUT:
495     RETVAL
496    
497     int
498     type (s_epipe *epp)
499     CODE:
500     RETVAL = epp->len;
501     OUTPUT:
502     RETVAL
503    
504     void
505     s_epipe_signal (s_epipe *epp)
506    
507     void
508     s_epipe_drain (s_epipe *epp)
509    
510     void
511 root 1.18 signal_func (s_epipe *epp)
512     ALIAS:
513     drain_func = 1
514 root 1.15 PPCODE:
515     EXTEND (SP, 2);
516 root 1.18 PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
517 root 1.15 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
518    
519     void
520     s_epipe_wait (s_epipe *epp)
521    
522     void
523 root 1.11 DESTROY (s_epipe *epp)
524     CODE:
525     s_epipe_destroy (epp);
526