ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.12
Committed: Sat Jul 18 05:14:19 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-0_06
Changes since 1.11: +19 -8 lines
Log Message:
0.6

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.8 #include "schmorp.h"
6    
7 root 1.1 typedef volatile sig_atomic_t atomic_t;
8    
9     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
10     static Sighandler_t old_sighandler;
11     static atomic_t async_pending;
12    
13 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
14     (PERL_REVISION > (a) \
15     || (PERL_REVISION == (a) \
16     && (PERL_VERSION > (b) \
17     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
18    
19     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
20     # define HAS_SA_SIGINFO 1
21     #endif
22    
23     #if !PERL_VERSION_ATLEAST(5,10,0)
24     # undef HAS_SA_SIGINFO
25     #endif
26    
27 root 1.6 /*****************************************************************************/
28 root 1.1
29 root 1.6 typedef struct {
30 root 1.1 SV *cb;
31 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
32     void *c_arg;
33     SV *fh_r, *fh_w;
34 root 1.9 SV *value;
35 root 1.6 int signum;
36 root 1.11 int autodrain;
37 root 1.7 volatile int blocked;
38 root 1.1
39 root 1.9 s_epipe ep;
40 root 1.6 int fd_wlen;
41     atomic_t fd_enable;
42 root 1.2 atomic_t pending;
43 root 1.9 volatile IV *valuep;
44 root 1.6 } async_t;
45    
46     static AV *asyncs;
47     static async_t *sig_async [SIG_SIZE];
48    
49     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
50     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
51 root 1.1
52     /* the main workhorse to signal */
53     static void
54     async_signal (void *signal_arg, int value)
55     {
56 root 1.6 static char pipedata [8];
57    
58     async_t *async = (async_t *)signal_arg;
59 root 1.2 int pending = async->pending;
60 root 1.1
61 root 1.9 *async->valuep = value ? value : 1;
62 root 1.2 async->pending = 1;
63     async_pending = 1;
64     psig_pend [9] = 1;
65     *sig_pending = 1;
66 root 1.1
67 root 1.10 if (!pending && async->fd_enable && async->ep.len)
68     s_epipe_signal (&async->ep);
69 root 1.2 }
70    
71     static void
72 root 1.6 handle_async (async_t *async)
73 root 1.2 {
74     int old_errno = errno;
75 root 1.9 int value = *async->valuep;
76 root 1.2
77 root 1.9 *async->valuep = 0;
78 root 1.2 async->pending = 0;
79    
80     /* drain pipe */
81 root 1.11 if (async->fd_enable && async->ep.len && async->autodrain)
82 root 1.9 s_epipe_drain (&async->ep);
83 root 1.2
84     if (async->c_cb)
85     {
86     dTHX;
87     async->c_cb (aTHX_ async->c_arg, value);
88     }
89    
90     if (async->cb)
91     {
92     dSP;
93    
94     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
95     SV *savedie = PL_diehook;
96    
97     PL_diehook = 0;
98    
99     PUSHSTACKi (PERLSI_SIGNAL);
100    
101     PUSHMARK (SP);
102     XPUSHs (sv_2mortal (newSViv (value)));
103     PUTBACK;
104     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
105    
106     if (SvTRUE (ERRSV))
107     {
108     SPAGAIN;
109    
110     PUSHMARK (SP);
111     PUTBACK;
112     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
113    
114     sv_setpvn (ERRSV, "", 0);
115     }
116    
117     if (saveerr)
118     sv_setsv (ERRSV, saveerr);
119 root 1.1
120 root 1.2 {
121     SV *oldhook = PL_diehook;
122     PL_diehook = savedie;
123     SvREFCNT_dec (oldhook);
124     }
125    
126     POPSTACK;
127 root 1.1 }
128    
129 root 1.2 errno = old_errno;
130 root 1.1 }
131    
132     static void
133 root 1.2 handle_asyncs (void)
134 root 1.1 {
135 root 1.2 int i;
136    
137 root 1.1 async_pending = 0;
138 root 1.2
139     for (i = AvFILLp (asyncs); i >= 0; --i)
140     {
141 root 1.12 SV *async_sv = AvARRAY (asyncs)[i];
142     async_t *async = SvASYNC_nrv (async_sv);
143 root 1.2
144     if (async->pending && !async->blocked)
145 root 1.12 {
146     /* temporarily keep a refcount */
147     SvREFCNT_inc (async_sv);
148     handle_async (async);
149     SvREFCNT_dec (async_sv);
150    
151     /* the handler could have deleted any number of asyncs */
152     if (i > AvFILLp (asyncs))
153     i = AvFILLp (asyncs);
154     }
155 root 1.2 }
156 root 1.1 }
157    
158 root 1.5 #if HAS_SA_SIGINFO
159 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
160     {
161     if (signum == 9)
162 root 1.2 handle_asyncs ();
163 root 1.1 else
164     old_sighandler (signum, si, sarg);
165     }
166     #else
167 root 1.3 static Signal_t async_sighandler (int signum)
168 root 1.1 {
169     if (signum == 9)
170 root 1.3 handle_asyncs ();
171 root 1.1 else
172     old_sighandler (signum);
173     }
174     #endif
175    
176 root 1.4 static void
177 root 1.6 async_sigsend (int signum)
178     {
179     async_signal (sig_async [signum], 0);
180     }
181    
182 root 1.7 #define block(async) ++(async)->blocked
183    
184 root 1.6 static void
185 root 1.7 unblock (async_t *async)
186 root 1.4 {
187     --async->blocked;
188     if (async->pending && !async->blocked)
189     handle_async (async);
190 root 1.7 }
191 root 1.4
192 root 1.7 static void
193     scope_block_cb (pTHX_ void *async_sv)
194     {
195     async_t *async = SvASYNC_nrv ((SV *)async_sv);
196     unblock (async);
197 root 1.4 SvREFCNT_dec (async_sv);
198     }
199 root 1.1
200     MODULE = Async::Interrupt PACKAGE = Async::Interrupt
201    
202     BOOT:
203     old_sighandler = PL_sighandlerp;
204     PL_sighandlerp = async_sighandler;
205     sig_pending = &PL_sig_pending;
206     psig_pend = PL_psig_pend;
207     asyncs = newAV ();
208 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
209 root 1.1
210 root 1.3 PROTOTYPES: DISABLE
211    
212 root 1.6 void
213 root 1.9 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
214 root 1.6 PPCODE:
215 root 1.1 {
216 root 1.8 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
217 root 1.6 async_t *async;
218 root 1.1
219 root 1.6 Newz (0, async, 1, async_t);
220 root 1.1
221 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
222 root 1.9 /* TODO: need to bless right now to ensure deallocation */
223 root 1.6 av_push (asyncs, TOPs);
224 root 1.2
225 root 1.9 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
226     if (SvOK (fh_r) || SvOK (fh_w))
227     {
228     int fd_r = s_fileno_croak (fh_r, 0);
229     int fd_w = s_fileno_croak (fh_w, 1);
230    
231     async->fh_r = newSVsv (fh_r);
232     async->fh_w = newSVsv (fh_w);
233     async->ep.fd [0] = fd_r;
234     async->ep.fd [1] = fd_w;
235     async->ep.len = 1;
236     async->fd_enable = 1;
237     }
238    
239     async->value = SvROK (pvalue)
240     ? SvREFCNT_inc_NN (SvRV (pvalue))
241     : NEWSV (0, 0);
242    
243     sv_setiv (async->value, 0);
244     SvIOK_only (async->value); /* just to be sure */
245     SvREADONLY_on (async->value);
246    
247     async->valuep = &(SvIVX (async->value));
248    
249 root 1.11 async->autodrain = 1;
250 root 1.6 async->cb = cv;
251     async->c_cb = c_cb;
252     async->c_arg = c_arg;
253 root 1.8 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
254 root 1.6
255     if (async->signum)
256     {
257     if (async->signum < 0)
258     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
259    
260     sig_async [async->signum] = async;
261     #if _WIN32
262     signal (async->signum, async_sigsend);
263     #else
264     {
265 root 1.12 struct sigaction sa;
266 root 1.6 sa.sa_handler = async_sigsend;
267     sigfillset (&sa.sa_mask);
268 root 1.12 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
269 root 1.6 sigaction (async->signum, &sa, 0);
270     }
271     #endif
272     }
273 root 1.1 }
274    
275     void
276 root 1.6 signal_func (async_t *async)
277 root 1.1 PPCODE:
278     EXTEND (SP, 2);
279     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
280 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
281 root 1.1
282 root 1.9 IV
283     c_var (async_t *async)
284     CODE:
285     RETVAL = PTR2IV (async->valuep);
286     OUTPUT:
287     RETVAL
288    
289 root 1.1 void
290 root 1.9 signal (async_t *async, int value = 1)
291 root 1.1 CODE:
292 root 1.6 async_signal (async, value);
293 root 1.2
294     void
295 root 1.6 block (async_t *async)
296 root 1.2 CODE:
297 root 1.7 block (async);
298 root 1.2
299     void
300 root 1.6 unblock (async_t *async)
301 root 1.2 CODE:
302 root 1.7 unblock (async);
303 root 1.1
304     void
305 root 1.4 scope_block (SV *self)
306     CODE:
307     {
308     SV *async_sv = SvRV (self);
309 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
310 root 1.7 block (async);
311 root 1.4
312     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
313     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
314     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
315     }
316    
317     void
318 root 1.6 pipe_enable (async_t *async)
319     ALIAS:
320     pipe_enable = 1
321     pipe_disable = 0
322     CODE:
323     async->fd_enable = ix;
324    
325 root 1.9 int
326     pipe_fileno (async_t *async)
327     CODE:
328     if (!async->ep.len)
329     {
330     int res;
331    
332     /*block (async);*//*TODO*/
333     res = s_epipe_new (&async->ep);
334     async->fd_enable = 1;
335     /*unblock (async);*//*TODO*/
336    
337     if (res < 0)
338     croak ("Async::Interrupt: unable to initialize event pipe");
339     }
340    
341     RETVAL = async->ep.fd [0];
342     OUTPUT:
343     RETVAL
344    
345 root 1.11 int
346     pipe_autodrain (async_t *async, int enable = -1)
347     CODE:
348     RETVAL = async->autodrain;
349     if (enable >= 0)
350     async->autodrain = enable;
351     OUTPUT:
352     RETVAL
353 root 1.9
354     void
355     post_fork (async_t *async)
356     CODE:
357     if (async->ep.len)
358     {
359     int res;
360    
361     /*block (async);*//*TODO*/
362     res = s_epipe_renew (&async->ep);
363     /*unblock (async);*//*TODO*/
364    
365     if (res < 0)
366     croak ("Async::Interrupt: unable to initialize event pipe after fork");
367     }
368    
369 root 1.6 void
370 root 1.1 DESTROY (SV *self)
371     CODE:
372     {
373     int i;
374     SV *async_sv = SvRV (self);
375 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
376 root 1.1
377 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
378 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
379     {
380 root 1.12 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
381     av_pop (asyncs);
382 root 1.1 goto found;
383     }
384    
385     if (!PL_dirty)
386 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
387 root 1.1
388     found:
389 root 1.6
390     if (async->signum)
391     {
392     #if _WIN32
393     signal (async->signum, SIG_DFL);
394     #else
395     {
396 root 1.12 struct sigaction sa;
397 root 1.6 sa.sa_handler = SIG_DFL;
398 root 1.12 sigemptyset (&sa.sa_mask);
399     sa.sa_flags = 0;
400 root 1.6 sigaction (async->signum, &sa, 0);
401     }
402     #endif
403     }
404    
405 root 1.9 if (!async->fh_r && async->ep.len)
406     s_epipe_destroy (&async->ep);
407    
408 root 1.2 SvREFCNT_dec (async->fh_r);
409     SvREFCNT_dec (async->fh_w);
410 root 1.1 SvREFCNT_dec (async->cb);
411 root 1.9 SvREFCNT_dec (async->value);
412 root 1.1
413     Safefree (async);
414     }
415    
416 root 1.11 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
417    
418     void
419     new (const char *klass)
420     PPCODE:
421     {
422     s_epipe *epp;
423     SV *self;
424    
425     Newz (0, epp, 1, s_epipe);
426     XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
427    
428     if (s_epipe_new (epp) < 0)
429     croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
430     }
431    
432     void
433     filenos (s_epipe *epp)
434     PPCODE:
435     EXTEND (SP, 2);
436     PUSHs (sv_2mortal (newSViv (epp->fd [0])));
437     PUSHs (sv_2mortal (newSViv (epp->fd [1])));
438    
439     int
440     fileno (s_epipe *epp)
441     ALIAS:
442     fileno = 0
443     fileno_r = 0
444     fileno_w = 1
445     CODE:
446     RETVAL = epp->fd [ix];
447     OUTPUT:
448     RETVAL
449    
450     int
451     type (s_epipe *epp)
452     CODE:
453     RETVAL = epp->len;
454     OUTPUT:
455     RETVAL
456    
457     void
458     s_epipe_signal (s_epipe *epp)
459    
460     void
461     s_epipe_drain (s_epipe *epp)
462    
463     void
464     DESTROY (s_epipe *epp)
465     CODE:
466     s_epipe_destroy (epp);
467