ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.6
Committed: Sat Jul 11 22:16:50 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-0_041, rel-0_04
Changes since 1.5: +122 -41 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 typedef volatile sig_atomic_t atomic_t;
6
7 static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8 static Sighandler_t old_sighandler;
9 static atomic_t async_pending;
10
11 #define PERL_VERSION_ATLEAST(a,b,c) \
12 (PERL_REVISION > (a) \
13 || (PERL_REVISION == (a) \
14 && (PERL_VERSION > (b) \
15 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
16
17 #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
18 # define HAS_SA_SIGINFO 1
19 #endif
20
21 #if !PERL_VERSION_ATLEAST(5,10,0)
22 # undef HAS_SA_SIGINFO
23 #endif
24
25 /*****************************************************************************/
26 /* support stuff, copied from EV.xs mostly */
27
28 static int
29 extract_fd (SV *fh, int wr)
30 {
31 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
32
33 if (fd < 0)
34 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
35
36 return fd;
37 }
38
39 static SV *
40 get_cb (SV *cb_sv)
41 {
42 HV *st;
43 GV *gvp;
44 CV *cv;
45
46 if (!SvOK (cb_sv))
47 return 0;
48
49 cv = sv_2cv (cb_sv, &st, &gvp, 0);
50
51 if (!cv)
52 croak ("Async::Interrupt callback must be undef or a CODE reference");
53
54 return (SV *)cv;
55 }
56
57 #ifndef SIG_SIZE
58 /* kudos to Slaven Rezic for the idea */
59 static char sig_size [] = { SIG_NUM };
60 # define SIG_SIZE (sizeof (sig_size) + 1)
61 #endif
62
63 static int
64 sv_signum (SV *sig)
65 {
66 int signum;
67
68 SvGETMAGIC (sig);
69
70 for (signum = 1; signum < SIG_SIZE; ++signum)
71 if (strEQ (SvPV_nolen (sig), PL_sig_name [signum]))
72 return signum;
73
74 signum = SvIV (sig);
75
76 if (signum > 0 && signum < SIG_SIZE)
77 return signum;
78
79 return -1;
80 }
81
82 /*****************************************************************************/
83
84 typedef struct {
85 SV *cb;
86 void (*c_cb)(pTHX_ void *c_arg, int value);
87 void *c_arg;
88 SV *fh_r, *fh_w;
89 int blocked;
90 int signum;
91
92 int fd_r, fd_w;
93 int fd_wlen;
94 atomic_t fd_enable;
95 atomic_t value;
96 atomic_t pending;
97 } async_t;
98
99 static AV *asyncs;
100 static async_t *sig_async [SIG_SIZE];
101
102 #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
103 #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
104
105 /* the main workhorse to signal */
106 static void
107 async_signal (void *signal_arg, int value)
108 {
109 static char pipedata [8];
110
111 async_t *async = (async_t *)signal_arg;
112 int pending = async->pending;
113
114 async->value = value;
115 async->pending = 1;
116 async_pending = 1;
117 psig_pend [9] = 1;
118 *sig_pending = 1;
119
120 if (!pending && async->fd_w >= 0 && async->fd_enable)
121 if (write (async->fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
122 /* on EINVAL we assume it's an eventfd */
123 write (async->fd_w, pipedata, (async->fd_wlen = 8));
124 }
125
126 static void
127 handle_async (async_t *async)
128 {
129 int old_errno = errno;
130 int value = async->value;
131
132 async->pending = 0;
133
134 /* drain pipe */
135 if (async->fd_r >= 0 && async->fd_enable)
136 {
137 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
138
139 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
140 ;
141 }
142
143 if (async->c_cb)
144 {
145 dTHX;
146 async->c_cb (aTHX_ async->c_arg, value);
147 }
148
149 if (async->cb)
150 {
151 dSP;
152
153 SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
154 SV *savedie = PL_diehook;
155
156 PL_diehook = 0;
157
158 PUSHSTACKi (PERLSI_SIGNAL);
159
160 PUSHMARK (SP);
161 XPUSHs (sv_2mortal (newSViv (value)));
162 PUTBACK;
163 call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
164
165 if (SvTRUE (ERRSV))
166 {
167 SPAGAIN;
168
169 PUSHMARK (SP);
170 PUTBACK;
171 call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
172
173 sv_setpvn (ERRSV, "", 0);
174 }
175
176 if (saveerr)
177 sv_setsv (ERRSV, saveerr);
178
179 {
180 SV *oldhook = PL_diehook;
181 PL_diehook = savedie;
182 SvREFCNT_dec (oldhook);
183 }
184
185 POPSTACK;
186 }
187
188 errno = old_errno;
189 }
190
191 static void
192 handle_asyncs (void)
193 {
194 int i;
195
196 async_pending = 0;
197
198 for (i = AvFILLp (asyncs); i >= 0; --i)
199 {
200 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]);
201
202 if (async->pending && !async->blocked)
203 handle_async (async);
204 }
205 }
206
207 #if HAS_SA_SIGINFO
208 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
209 {
210 if (signum == 9)
211 handle_asyncs ();
212 else
213 old_sighandler (signum, si, sarg);
214 }
215 #else
216 static Signal_t async_sighandler (int signum)
217 {
218 if (signum == 9)
219 handle_asyncs ();
220 else
221 old_sighandler (signum);
222 }
223 #endif
224
225 static void
226 async_sigsend (int signum)
227 {
228 async_signal (sig_async [signum], 0);
229 }
230
231 static void
232 scope_block_cb (pTHX_ void *async_sv)
233 {
234 async_t *async = SvASYNC_nrv ((SV *)async_sv);
235
236 --async->blocked;
237 if (async->pending && !async->blocked)
238 handle_async (async);
239
240 SvREFCNT_dec (async_sv);
241 }
242
243 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
244
245 BOOT:
246 old_sighandler = PL_sighandlerp;
247 PL_sighandlerp = async_sighandler;
248 sig_pending = &PL_sig_pending;
249 psig_pend = PL_psig_pend;
250 asyncs = newAV ();
251 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
252
253 PROTOTYPES: DISABLE
254
255 void
256 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl)
257 PPCODE:
258 {
259 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0;
260 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
261 int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
262 async_t *async;
263
264 Newz (0, async, 1, async_t);
265
266 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
267 av_push (asyncs, TOPs);
268
269 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r;
270 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w;
271 async->fd_wlen = 1;
272 async->fd_enable = 1;
273 async->cb = cv;
274 async->c_cb = c_cb;
275 async->c_arg = c_arg;
276 SvGETMAGIC (signl);
277 async->signum = SvOK (signl) ? sv_signum (signl) : 0;
278
279 if (async->signum)
280 {
281 if (async->signum < 0)
282 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
283
284 sig_async [async->signum] = async;
285 #if _WIN32
286 signal (async->signum, async_sigsend);
287 #else
288 {
289 struct sigaction sa = { };
290 sa.sa_handler = async_sigsend;
291 sigfillset (&sa.sa_mask);
292 sigaction (async->signum, &sa, 0);
293 }
294 #endif
295 }
296 }
297
298 void
299 signal_func (async_t *async)
300 PPCODE:
301 EXTEND (SP, 2);
302 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
303 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
304
305 void
306 signal (async_t *async, int value = 0)
307 CODE:
308 async_signal (async, value);
309
310 void
311 block (async_t *async)
312 CODE:
313 ++async->blocked;
314
315 void
316 unblock (async_t *async)
317 CODE:
318 --async->blocked;
319 if (async->pending && !async->blocked)
320 handle_async (async);
321
322 void
323 scope_block (SV *self)
324 CODE:
325 {
326 SV *async_sv = SvRV (self);
327 async_t *async = SvASYNC_nrv (async_sv);
328 ++async->blocked;
329
330 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
331 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
332 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
333 }
334
335 void
336 pipe_enable (async_t *async)
337 ALIAS:
338 pipe_enable = 1
339 pipe_disable = 0
340 CODE:
341 async->fd_enable = ix;
342
343 void
344 DESTROY (SV *self)
345 CODE:
346 {
347 int i;
348 SV *async_sv = SvRV (self);
349 async_t *async = SvASYNC_nrv (async_sv);
350
351 for (i = AvFILLp (asyncs); i >= 0; --i)
352 if (AvARRAY (asyncs)[i] == async_sv)
353 {
354 if (i < AvFILLp (asyncs))
355 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
356
357 assert (av_pop (asyncs) == async_sv);
358 goto found;
359 }
360
361 if (!PL_dirty)
362 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
363
364 found:
365
366 if (async->signum)
367 {
368 #if _WIN32
369 signal (async->signum, SIG_DFL);
370 #else
371 {
372 struct sigaction sa = { };
373 sa.sa_handler = SIG_DFL;
374 sigaction (async->signum, &sa, 0);
375 }
376 #endif
377 }
378
379 SvREFCNT_dec (async->fh_r);
380 SvREFCNT_dec (async->fh_w);
381 SvREFCNT_dec (async->cb);
382
383 Safefree (async);
384 }
385