ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.7
Committed: Tue Jul 14 14:18:10 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.6: +24 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     typedef volatile sig_atomic_t atomic_t;
6    
7     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8     static Sighandler_t old_sighandler;
9     static atomic_t async_pending;
10    
11 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
12     (PERL_REVISION > (a) \
13     || (PERL_REVISION == (a) \
14     && (PERL_VERSION > (b) \
15     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
16    
17     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
18     # define HAS_SA_SIGINFO 1
19     #endif
20    
21     #if !PERL_VERSION_ATLEAST(5,10,0)
22     # undef HAS_SA_SIGINFO
23     #endif
24    
25 root 1.6 /*****************************************************************************/
26     /* support stuff, copied from EV.xs mostly */
27    
28 root 1.1 static int
29     extract_fd (SV *fh, int wr)
30     {
31     int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
32    
33     if (fd < 0)
34     croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
35    
36     return fd;
37     }
38    
39     static SV *
40     get_cb (SV *cb_sv)
41     {
42     HV *st;
43     GV *gvp;
44     CV *cv;
45    
46     if (!SvOK (cb_sv))
47     return 0;
48    
49     cv = sv_2cv (cb_sv, &st, &gvp, 0);
50    
51     if (!cv)
52     croak ("Async::Interrupt callback must be undef or a CODE reference");
53    
54     return (SV *)cv;
55     }
56    
57 root 1.6 #ifndef SIG_SIZE
58     /* kudos to Slaven Rezic for the idea */
59     static char sig_size [] = { SIG_NUM };
60     # define SIG_SIZE (sizeof (sig_size) + 1)
61     #endif
62    
63     static int
64     sv_signum (SV *sig)
65     {
66     int signum;
67    
68     SvGETMAGIC (sig);
69    
70     for (signum = 1; signum < SIG_SIZE; ++signum)
71     if (strEQ (SvPV_nolen (sig), PL_sig_name [signum]))
72     return signum;
73    
74     signum = SvIV (sig);
75    
76     if (signum > 0 && signum < SIG_SIZE)
77     return signum;
78    
79     return -1;
80     }
81    
82     /*****************************************************************************/
83 root 1.1
84 root 1.6 typedef struct {
85 root 1.1 SV *cb;
86 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
87     void *c_arg;
88     SV *fh_r, *fh_w;
89 root 1.6 int signum;
90 root 1.7 volatile int blocked;
91 root 1.1
92 root 1.7 int fd_r;
93     volatile int fd_w;
94 root 1.6 int fd_wlen;
95     atomic_t fd_enable;
96 root 1.1 atomic_t value;
97 root 1.2 atomic_t pending;
98 root 1.6 } async_t;
99    
100     static AV *asyncs;
101     static async_t *sig_async [SIG_SIZE];
102    
103     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
104     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
105 root 1.1
106     /* the main workhorse to signal */
107     static void
108     async_signal (void *signal_arg, int value)
109     {
110 root 1.6 static char pipedata [8];
111    
112     async_t *async = (async_t *)signal_arg;
113 root 1.2 int pending = async->pending;
114 root 1.1
115 root 1.2 async->value = value;
116     async->pending = 1;
117     async_pending = 1;
118     psig_pend [9] = 1;
119     *sig_pending = 1;
120 root 1.1
121 root 1.7 {
122     int fd_w = async->fd_w;
123     int fd_enable = async->fd_enable;
124    
125     if (!pending && fd_w >= 0 && fd_enable)
126     if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
127     /* on EINVAL we assume it's an eventfd */
128     write (fd_w, pipedata, (async->fd_wlen = 8));
129     }
130 root 1.2 }
131    
132     static void
133 root 1.6 handle_async (async_t *async)
134 root 1.2 {
135     int old_errno = errno;
136     int value = async->value;
137    
138     async->pending = 0;
139    
140     /* drain pipe */
141 root 1.6 if (async->fd_r >= 0 && async->fd_enable)
142 root 1.1 {
143 root 1.6 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
144 root 1.2
145     while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
146     ;
147     }
148    
149     if (async->c_cb)
150     {
151     dTHX;
152     async->c_cb (aTHX_ async->c_arg, value);
153     }
154    
155     if (async->cb)
156     {
157     dSP;
158    
159     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
160     SV *savedie = PL_diehook;
161    
162     PL_diehook = 0;
163    
164     PUSHSTACKi (PERLSI_SIGNAL);
165    
166     PUSHMARK (SP);
167     XPUSHs (sv_2mortal (newSViv (value)));
168     PUTBACK;
169     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
170    
171     if (SvTRUE (ERRSV))
172     {
173     SPAGAIN;
174    
175     PUSHMARK (SP);
176     PUTBACK;
177     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
178    
179     sv_setpvn (ERRSV, "", 0);
180     }
181    
182     if (saveerr)
183     sv_setsv (ERRSV, saveerr);
184 root 1.1
185 root 1.2 {
186     SV *oldhook = PL_diehook;
187     PL_diehook = savedie;
188     SvREFCNT_dec (oldhook);
189     }
190    
191     POPSTACK;
192 root 1.1 }
193    
194 root 1.2 errno = old_errno;
195 root 1.1 }
196    
197     static void
198 root 1.2 handle_asyncs (void)
199 root 1.1 {
200 root 1.2 int i;
201    
202 root 1.1 async_pending = 0;
203 root 1.2
204     for (i = AvFILLp (asyncs); i >= 0; --i)
205     {
206 root 1.6 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]);
207 root 1.2
208     if (async->pending && !async->blocked)
209     handle_async (async);
210     }
211 root 1.1 }
212    
213 root 1.5 #if HAS_SA_SIGINFO
214 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
215     {
216     if (signum == 9)
217 root 1.2 handle_asyncs ();
218 root 1.1 else
219     old_sighandler (signum, si, sarg);
220     }
221     #else
222 root 1.3 static Signal_t async_sighandler (int signum)
223 root 1.1 {
224     if (signum == 9)
225 root 1.3 handle_asyncs ();
226 root 1.1 else
227     old_sighandler (signum);
228     }
229     #endif
230    
231 root 1.4 static void
232 root 1.6 async_sigsend (int signum)
233     {
234     async_signal (sig_async [signum], 0);
235     }
236    
237 root 1.7 #define block(async) ++(async)->blocked
238    
239 root 1.6 static void
240 root 1.7 unblock (async_t *async)
241 root 1.4 {
242     --async->blocked;
243     if (async->pending && !async->blocked)
244     handle_async (async);
245 root 1.7 }
246 root 1.4
247 root 1.7 static void
248     scope_block_cb (pTHX_ void *async_sv)
249     {
250     async_t *async = SvASYNC_nrv ((SV *)async_sv);
251     unblock (async);
252 root 1.4 SvREFCNT_dec (async_sv);
253     }
254 root 1.1
255     MODULE = Async::Interrupt PACKAGE = Async::Interrupt
256    
257     BOOT:
258     old_sighandler = PL_sighandlerp;
259     PL_sighandlerp = async_sighandler;
260     sig_pending = &PL_sig_pending;
261     psig_pend = PL_psig_pend;
262     asyncs = newAV ();
263 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
264 root 1.1
265 root 1.3 PROTOTYPES: DISABLE
266    
267 root 1.6 void
268     _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl)
269     PPCODE:
270 root 1.1 {
271 root 1.5 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0;
272 root 1.2 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
273     int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
274 root 1.6 async_t *async;
275 root 1.1
276 root 1.6 Newz (0, async, 1, async_t);
277 root 1.1
278 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
279     av_push (asyncs, TOPs);
280 root 1.2
281 root 1.6 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r;
282     async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w;
283     async->fd_wlen = 1;
284     async->fd_enable = 1;
285     async->cb = cv;
286     async->c_cb = c_cb;
287     async->c_arg = c_arg;
288     SvGETMAGIC (signl);
289     async->signum = SvOK (signl) ? sv_signum (signl) : 0;
290    
291     if (async->signum)
292     {
293     if (async->signum < 0)
294     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
295    
296     sig_async [async->signum] = async;
297     #if _WIN32
298     signal (async->signum, async_sigsend);
299     #else
300     {
301     struct sigaction sa = { };
302     sa.sa_handler = async_sigsend;
303     sigfillset (&sa.sa_mask);
304     sigaction (async->signum, &sa, 0);
305     }
306     #endif
307     }
308 root 1.1 }
309    
310     void
311 root 1.6 signal_func (async_t *async)
312 root 1.1 PPCODE:
313     EXTEND (SP, 2);
314     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
315 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
316 root 1.1
317     void
318 root 1.6 signal (async_t *async, int value = 0)
319 root 1.1 CODE:
320 root 1.6 async_signal (async, value);
321 root 1.2
322     void
323 root 1.6 block (async_t *async)
324 root 1.2 CODE:
325 root 1.7 block (async);
326 root 1.2
327     void
328 root 1.6 unblock (async_t *async)
329 root 1.2 CODE:
330 root 1.7 unblock (async);
331 root 1.1
332     void
333 root 1.4 scope_block (SV *self)
334     CODE:
335     {
336     SV *async_sv = SvRV (self);
337 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
338 root 1.7 block (async);
339 root 1.4
340     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
341     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
342     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
343     }
344    
345     void
346 root 1.6 pipe_enable (async_t *async)
347     ALIAS:
348     pipe_enable = 1
349     pipe_disable = 0
350     CODE:
351     async->fd_enable = ix;
352    
353     void
354 root 1.1 DESTROY (SV *self)
355     CODE:
356     {
357     int i;
358     SV *async_sv = SvRV (self);
359 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
360 root 1.1
361 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
362 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
363     {
364     if (i < AvFILLp (asyncs))
365 root 1.3 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
366 root 1.1
367     assert (av_pop (asyncs) == async_sv);
368     goto found;
369     }
370    
371     if (!PL_dirty)
372 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
373 root 1.1
374     found:
375 root 1.6
376     if (async->signum)
377     {
378     #if _WIN32
379     signal (async->signum, SIG_DFL);
380     #else
381     {
382     struct sigaction sa = { };
383     sa.sa_handler = SIG_DFL;
384     sigaction (async->signum, &sa, 0);
385     }
386     #endif
387     }
388    
389 root 1.2 SvREFCNT_dec (async->fh_r);
390     SvREFCNT_dec (async->fh_w);
391 root 1.1 SvREFCNT_dec (async->cb);
392    
393     Safefree (async);
394     }
395