ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.6
Committed: Sat Jul 11 22:16:50 2009 UTC (15 years ago) by root
Branch: MAIN
CVS Tags: rel-0_041, rel-0_04
Changes since 1.5: +122 -41 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     typedef volatile sig_atomic_t atomic_t;
6    
7     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8     static Sighandler_t old_sighandler;
9     static atomic_t async_pending;
10    
11 root 1.5 #define PERL_VERSION_ATLEAST(a,b,c) \
12     (PERL_REVISION > (a) \
13     || (PERL_REVISION == (a) \
14     && (PERL_VERSION > (b) \
15     || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
16    
17     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
18     # define HAS_SA_SIGINFO 1
19     #endif
20    
21     #if !PERL_VERSION_ATLEAST(5,10,0)
22     # undef HAS_SA_SIGINFO
23     #endif
24    
25 root 1.6 /*****************************************************************************/
26     /* support stuff, copied from EV.xs mostly */
27    
28 root 1.1 static int
29     extract_fd (SV *fh, int wr)
30     {
31     int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
32    
33     if (fd < 0)
34     croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
35    
36     return fd;
37     }
38    
39     static SV *
40     get_cb (SV *cb_sv)
41     {
42     HV *st;
43     GV *gvp;
44     CV *cv;
45    
46     if (!SvOK (cb_sv))
47     return 0;
48    
49     cv = sv_2cv (cb_sv, &st, &gvp, 0);
50    
51     if (!cv)
52     croak ("Async::Interrupt callback must be undef or a CODE reference");
53    
54     return (SV *)cv;
55     }
56    
57 root 1.6 #ifndef SIG_SIZE
58     /* kudos to Slaven Rezic for the idea */
59     static char sig_size [] = { SIG_NUM };
60     # define SIG_SIZE (sizeof (sig_size) + 1)
61     #endif
62    
63     static int
64     sv_signum (SV *sig)
65     {
66     int signum;
67    
68     SvGETMAGIC (sig);
69    
70     for (signum = 1; signum < SIG_SIZE; ++signum)
71     if (strEQ (SvPV_nolen (sig), PL_sig_name [signum]))
72     return signum;
73    
74     signum = SvIV (sig);
75    
76     if (signum > 0 && signum < SIG_SIZE)
77     return signum;
78    
79     return -1;
80     }
81    
82     /*****************************************************************************/
83 root 1.1
84 root 1.6 typedef struct {
85 root 1.1 SV *cb;
86 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
87     void *c_arg;
88     SV *fh_r, *fh_w;
89     int blocked;
90 root 1.6 int signum;
91 root 1.1
92 root 1.2 int fd_r, fd_w;
93 root 1.6 int fd_wlen;
94     atomic_t fd_enable;
95 root 1.1 atomic_t value;
96 root 1.2 atomic_t pending;
97 root 1.6 } async_t;
98    
99     static AV *asyncs;
100     static async_t *sig_async [SIG_SIZE];
101    
102     #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
103     #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
104 root 1.1
105     /* the main workhorse to signal */
106     static void
107     async_signal (void *signal_arg, int value)
108     {
109 root 1.6 static char pipedata [8];
110    
111     async_t *async = (async_t *)signal_arg;
112 root 1.2 int pending = async->pending;
113 root 1.1
114 root 1.2 async->value = value;
115     async->pending = 1;
116     async_pending = 1;
117     psig_pend [9] = 1;
118     *sig_pending = 1;
119 root 1.1
120 root 1.6 if (!pending && async->fd_w >= 0 && async->fd_enable)
121     if (write (async->fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
122     /* on EINVAL we assume it's an eventfd */
123     write (async->fd_w, pipedata, (async->fd_wlen = 8));
124 root 1.2 }
125    
126     static void
127 root 1.6 handle_async (async_t *async)
128 root 1.2 {
129     int old_errno = errno;
130     int value = async->value;
131    
132     async->pending = 0;
133    
134     /* drain pipe */
135 root 1.6 if (async->fd_r >= 0 && async->fd_enable)
136 root 1.1 {
137 root 1.6 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
138 root 1.2
139     while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
140     ;
141     }
142    
143     if (async->c_cb)
144     {
145     dTHX;
146     async->c_cb (aTHX_ async->c_arg, value);
147     }
148    
149     if (async->cb)
150     {
151     dSP;
152    
153     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
154     SV *savedie = PL_diehook;
155    
156     PL_diehook = 0;
157    
158     PUSHSTACKi (PERLSI_SIGNAL);
159    
160     PUSHMARK (SP);
161     XPUSHs (sv_2mortal (newSViv (value)));
162     PUTBACK;
163     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
164    
165     if (SvTRUE (ERRSV))
166     {
167     SPAGAIN;
168    
169     PUSHMARK (SP);
170     PUTBACK;
171     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
172    
173     sv_setpvn (ERRSV, "", 0);
174     }
175    
176     if (saveerr)
177     sv_setsv (ERRSV, saveerr);
178 root 1.1
179 root 1.2 {
180     SV *oldhook = PL_diehook;
181     PL_diehook = savedie;
182     SvREFCNT_dec (oldhook);
183     }
184    
185     POPSTACK;
186 root 1.1 }
187    
188 root 1.2 errno = old_errno;
189 root 1.1 }
190    
191     static void
192 root 1.2 handle_asyncs (void)
193 root 1.1 {
194 root 1.2 int i;
195    
196 root 1.1 async_pending = 0;
197 root 1.2
198     for (i = AvFILLp (asyncs); i >= 0; --i)
199     {
200 root 1.6 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]);
201 root 1.2
202     if (async->pending && !async->blocked)
203     handle_async (async);
204     }
205 root 1.1 }
206    
207 root 1.5 #if HAS_SA_SIGINFO
208 root 1.1 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
209     {
210     if (signum == 9)
211 root 1.2 handle_asyncs ();
212 root 1.1 else
213     old_sighandler (signum, si, sarg);
214     }
215     #else
216 root 1.3 static Signal_t async_sighandler (int signum)
217 root 1.1 {
218     if (signum == 9)
219 root 1.3 handle_asyncs ();
220 root 1.1 else
221     old_sighandler (signum);
222     }
223     #endif
224    
225 root 1.4 static void
226 root 1.6 async_sigsend (int signum)
227     {
228     async_signal (sig_async [signum], 0);
229     }
230    
231     static void
232 root 1.4 scope_block_cb (pTHX_ void *async_sv)
233     {
234 root 1.6 async_t *async = SvASYNC_nrv ((SV *)async_sv);
235 root 1.4
236     --async->blocked;
237     if (async->pending && !async->blocked)
238     handle_async (async);
239    
240     SvREFCNT_dec (async_sv);
241     }
242 root 1.1
243     MODULE = Async::Interrupt PACKAGE = Async::Interrupt
244    
245     BOOT:
246     old_sighandler = PL_sighandlerp;
247     PL_sighandlerp = async_sighandler;
248     sig_pending = &PL_sig_pending;
249     psig_pend = PL_psig_pend;
250     asyncs = newAV ();
251 root 1.4 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
252 root 1.1
253 root 1.3 PROTOTYPES: DISABLE
254    
255 root 1.6 void
256     _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl)
257     PPCODE:
258 root 1.1 {
259 root 1.5 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0;
260 root 1.2 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
261     int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
262 root 1.6 async_t *async;
263 root 1.1
264 root 1.6 Newz (0, async, 1, async_t);
265 root 1.1
266 root 1.6 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
267     av_push (asyncs, TOPs);
268 root 1.2
269 root 1.6 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r;
270     async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w;
271     async->fd_wlen = 1;
272     async->fd_enable = 1;
273     async->cb = cv;
274     async->c_cb = c_cb;
275     async->c_arg = c_arg;
276     SvGETMAGIC (signl);
277     async->signum = SvOK (signl) ? sv_signum (signl) : 0;
278    
279     if (async->signum)
280     {
281     if (async->signum < 0)
282     croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
283    
284     sig_async [async->signum] = async;
285     #if _WIN32
286     signal (async->signum, async_sigsend);
287     #else
288     {
289     struct sigaction sa = { };
290     sa.sa_handler = async_sigsend;
291     sigfillset (&sa.sa_mask);
292     sigaction (async->signum, &sa, 0);
293     }
294     #endif
295     }
296 root 1.1 }
297    
298     void
299 root 1.6 signal_func (async_t *async)
300 root 1.1 PPCODE:
301     EXTEND (SP, 2);
302     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
303 root 1.6 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
304 root 1.1
305     void
306 root 1.6 signal (async_t *async, int value = 0)
307 root 1.1 CODE:
308 root 1.6 async_signal (async, value);
309 root 1.2
310     void
311 root 1.6 block (async_t *async)
312 root 1.2 CODE:
313     ++async->blocked;
314    
315     void
316 root 1.6 unblock (async_t *async)
317 root 1.2 CODE:
318     --async->blocked;
319     if (async->pending && !async->blocked)
320     handle_async (async);
321 root 1.1
322     void
323 root 1.4 scope_block (SV *self)
324     CODE:
325     {
326     SV *async_sv = SvRV (self);
327 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
328 root 1.4 ++async->blocked;
329    
330     LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
331     SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
332     ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
333     }
334    
335     void
336 root 1.6 pipe_enable (async_t *async)
337     ALIAS:
338     pipe_enable = 1
339     pipe_disable = 0
340     CODE:
341     async->fd_enable = ix;
342    
343     void
344 root 1.1 DESTROY (SV *self)
345     CODE:
346     {
347     int i;
348     SV *async_sv = SvRV (self);
349 root 1.6 async_t *async = SvASYNC_nrv (async_sv);
350 root 1.1
351 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
352 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
353     {
354     if (i < AvFILLp (asyncs))
355 root 1.3 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
356 root 1.1
357     assert (av_pop (asyncs) == async_sv);
358     goto found;
359     }
360    
361     if (!PL_dirty)
362 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
363 root 1.1
364     found:
365 root 1.6
366     if (async->signum)
367     {
368     #if _WIN32
369     signal (async->signum, SIG_DFL);
370     #else
371     {
372     struct sigaction sa = { };
373     sa.sa_handler = SIG_DFL;
374     sigaction (async->signum, &sa, 0);
375     }
376     #endif
377     }
378    
379 root 1.2 SvREFCNT_dec (async->fh_r);
380     SvREFCNT_dec (async->fh_w);
381 root 1.1 SvREFCNT_dec (async->cb);
382    
383     Safefree (async);
384     }
385