ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.19
Committed: Tue Apr 24 22:01:59 2012 UTC (12 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.18: +31 -8 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "ecb.h"
6 #include "schmorp.h"
7
8 typedef volatile sig_atomic_t atomic_t;
9
10 static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
11 static Sighandler_t old_sighandler;
12 static atomic_t async_pending;
13
14 #define PERL_VERSION_ATLEAST(a,b,c) \
15 (PERL_REVISION > (a) \
16 || (PERL_REVISION == (a) \
17 && (PERL_VERSION > (b) \
18 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
19
20 #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
21 # define HAS_SA_SIGINFO 1
22 #endif
23
24 #if !PERL_VERSION_ATLEAST(5,10,0)
25 # undef HAS_SA_SIGINFO
26 #endif
27
28 /*****************************************************************************/
29
30 typedef struct {
31 SV *cb;
32 void (*c_cb)(pTHX_ void *c_arg, int value);
33 void *c_arg;
34 SV *fh_r, *fh_w;
35 SV *value;
36 int signum;
37 int autodrain;
38 ANY *scope_savestack;
39 volatile int blocked;
40
41 s_epipe ep;
42 int fd_wlen;
43 atomic_t fd_enable;
44 atomic_t pending;
45 volatile IV *valuep;
46 atomic_t hysteresis;
47 } async_t;
48
49 static AV *asyncs;
50 static async_t *sig_async [SIG_SIZE];
51
52 #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
53 #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
54
55 static void async_signal (void *signal_arg, int value);
56
57 static void
58 setsig (int signum, void (*handler)(int))
59 {
60 #if _WIN32
61 signal (signum, handler);
62 #else
63 struct sigaction sa;
64 sa.sa_handler = handler;
65 sigfillset (&sa.sa_mask);
66 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
67 sigaction (signum, &sa, 0);
68 #endif
69 }
70
71 static void
72 async_sigsend (int signum)
73 {
74 async_signal (sig_async [signum], 0);
75 }
76
77 /* the main workhorse to signal */
78 static void
79 async_signal (void *signal_arg, int value)
80 {
81 static char pipedata [8];
82
83 async_t *async = (async_t *)signal_arg;
84 int pending = async->pending;
85
86 if (async->hysteresis)
87 setsig (async->signum, SIG_IGN);
88
89 *async->valuep = value ? value : 1;
90 ECB_MEMORY_FENCE_RELEASE;
91 async->pending = 1;
92 ECB_MEMORY_FENCE_RELEASE;
93 async_pending = 1;
94 ECB_MEMORY_FENCE_RELEASE;
95
96 if (!async->blocked)
97 {
98 psig_pend [9] = 1;
99 ECB_MEMORY_FENCE_RELEASE;
100 *sig_pending = 1;
101 ECB_MEMORY_FENCE_RELEASE;
102 }
103
104 if (!pending && async->fd_enable && async->ep.len)
105 s_epipe_signal (&async->ep);
106 }
107
108 static void
109 handle_async (async_t *async)
110 {
111 int old_errno = errno;
112 int value = *async->valuep;
113
114 *async->valuep = 0;
115 async->pending = 0;
116
117 /* restore signal */
118 if (async->hysteresis)
119 setsig (async->signum, async_sigsend);
120
121 /* drain pipe */
122 if (async->fd_enable && async->ep.len && async->autodrain)
123 s_epipe_drain (&async->ep);
124
125 if (async->c_cb)
126 {
127 dTHX;
128 async->c_cb (aTHX_ async->c_arg, value);
129 }
130
131 if (async->cb)
132 {
133 dSP;
134
135 SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
136 SV *savedie = PL_diehook;
137
138 PL_diehook = 0;
139
140 PUSHSTACKi (PERLSI_SIGNAL);
141
142 PUSHMARK (SP);
143 XPUSHs (sv_2mortal (newSViv (value)));
144 PUTBACK;
145 call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
146
147 if (SvTRUE (ERRSV))
148 {
149 SPAGAIN;
150
151 PUSHMARK (SP);
152 PUTBACK;
153 call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
154
155 sv_setpvn (ERRSV, "", 0);
156 }
157
158 if (saveerr)
159 sv_setsv (ERRSV, saveerr);
160
161 {
162 SV *oldhook = PL_diehook;
163 PL_diehook = savedie;
164 SvREFCNT_dec (oldhook);
165 }
166
167 POPSTACK;
168 }
169
170 errno = old_errno;
171 }
172
173 static void
174 handle_asyncs (void)
175 {
176 int i;
177
178 ECB_MEMORY_FENCE_ACQUIRE;
179
180 async_pending = 0;
181
182 for (i = AvFILLp (asyncs); i >= 0; --i)
183 {
184 SV *async_sv = AvARRAY (asyncs)[i];
185 async_t *async = SvASYNC_nrv (async_sv);
186
187 if (async->pending && !async->blocked)
188 {
189 /* temporarily keep a refcount */
190 SvREFCNT_inc (async_sv);
191 handle_async (async);
192 SvREFCNT_dec (async_sv);
193
194 /* the handler could have deleted any number of asyncs */
195 if (i > AvFILLp (asyncs))
196 i = AvFILLp (asyncs);
197 }
198 }
199 }
200
201 #if HAS_SA_SIGINFO
202 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
203 {
204 if (signum == 9)
205 handle_asyncs ();
206 else
207 old_sighandler (signum, si, sarg);
208 }
209 #else
210 static Signal_t async_sighandler (int signum)
211 {
212 if (signum == 9)
213 handle_asyncs ();
214 else
215 old_sighandler (signum);
216 }
217 #endif
218
219 #define block(async) ++(async)->blocked
220
221 static void
222 unblock (async_t *async)
223 {
224 --async->blocked;
225 if (async->pending && !async->blocked)
226 handle_async (async);
227 }
228
229 static void
230 scope_block_cb (pTHX_ void *async_sv)
231 {
232 async_t *async = SvASYNC_nrv ((SV *)async_sv);
233
234 async->scope_savestack = 0;
235 unblock (async);
236 SvREFCNT_dec (async_sv);
237 }
238
239 static void
240 scope_block (SV *async_sv)
241 {
242 async_t *async = SvASYNC_nrv (async_sv);
243
244 /* as a heuristic, we skip the scope block if we already are blocked */
245 /* and the existing scope block used the same savestack */
246
247 if (!async->scope_savestack || async->scope_savestack != PL_savestack)
248 {
249 async->scope_savestack = PL_savestack;
250 block (async);
251
252 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
253 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
254 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
255 }
256 }
257
258 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
259
260 BOOT:
261 old_sighandler = PL_sighandlerp;
262 PL_sighandlerp = async_sighandler;
263 sig_pending = &PL_sig_pending;
264 psig_pend = PL_psig_pend;
265 asyncs = newAV ();
266 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
267
268 PROTOTYPES: DISABLE
269
270 void
271 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
272 PPCODE:
273 {
274 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
275 async_t *async;
276
277 Newz (0, async, 1, async_t);
278
279 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
280 /* TODO: need to bless right now to ensure deallocation */
281 av_push (asyncs, TOPs);
282
283 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
284 if (SvOK (fh_r) || SvOK (fh_w))
285 {
286 int fd_r = s_fileno_croak (fh_r, 0);
287 int fd_w = s_fileno_croak (fh_w, 1);
288
289 async->fh_r = newSVsv (fh_r);
290 async->fh_w = newSVsv (fh_w);
291 async->ep.fd [0] = fd_r;
292 async->ep.fd [1] = fd_w;
293 async->ep.len = 1;
294 async->fd_enable = 1;
295 }
296
297 async->value = SvROK (pvalue)
298 ? SvREFCNT_inc_NN (SvRV (pvalue))
299 : NEWSV (0, 0);
300
301 sv_setiv (async->value, 0);
302 SvIOK_only (async->value); /* just to be sure */
303 SvREADONLY_on (async->value);
304
305 async->valuep = &(SvIVX (async->value));
306
307 async->autodrain = 1;
308 async->cb = cv;
309 async->c_cb = c_cb;
310 async->c_arg = c_arg;
311 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
312
313 if (async->signum)
314 {
315 if (async->signum < 0)
316 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
317
318 sig_async [async->signum] = async;
319 setsig (async->signum, async_sigsend);
320 }
321 }
322
323 void
324 signal_hysteresis (async_t *async, int enable)
325 CODE:
326 async->hysteresis = enable;
327
328 void
329 signal_func (async_t *async)
330 PPCODE:
331 EXTEND (SP, 2);
332 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
333 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
334
335 void
336 scope_block_func (SV *self)
337 PPCODE:
338 EXTEND (SP, 2);
339 PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
340 PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
341
342 IV
343 c_var (async_t *async)
344 CODE:
345 RETVAL = PTR2IV (async->valuep);
346 OUTPUT:
347 RETVAL
348
349 void
350 handle (async_t *async)
351 CODE:
352 handle_async (async);
353
354 void
355 signal (async_t *async, int value = 1)
356 CODE:
357 async_signal (async, value);
358
359 void
360 block (async_t *async)
361 CODE:
362 block (async);
363
364 void
365 unblock (async_t *async)
366 CODE:
367 unblock (async);
368
369 void
370 scope_block (SV *self)
371 CODE:
372 scope_block (SvRV (self));
373
374 void
375 pipe_enable (async_t *async)
376 ALIAS:
377 pipe_enable = 1
378 pipe_disable = 0
379 CODE:
380 async->fd_enable = ix;
381
382 int
383 pipe_fileno (async_t *async)
384 CODE:
385 if (!async->ep.len)
386 {
387 int res;
388
389 /*block (async);*//*TODO*/
390 res = s_epipe_new (&async->ep);
391 async->fd_enable = 1;
392 /*unblock (async);*//*TODO*/
393
394 if (res < 0)
395 croak ("Async::Interrupt: unable to initialize event pipe");
396 }
397
398 RETVAL = async->ep.fd [0];
399 OUTPUT:
400 RETVAL
401
402 int
403 pipe_autodrain (async_t *async, int enable = -1)
404 CODE:
405 RETVAL = async->autodrain;
406 if (enable >= 0)
407 async->autodrain = enable;
408 OUTPUT:
409 RETVAL
410
411 void
412 pipe_drain (async_t *async)
413 CODE:
414 if (async->ep.len)
415 s_epipe_drain (&async->ep);
416
417 void
418 post_fork (async_t *async)
419 CODE:
420 if (async->ep.len)
421 {
422 int res;
423
424 /*block (async);*//*TODO*/
425 res = s_epipe_renew (&async->ep);
426 /*unblock (async);*//*TODO*/
427
428 if (res < 0)
429 croak ("Async::Interrupt: unable to initialize event pipe after fork");
430 }
431
432 void
433 DESTROY (SV *self)
434 CODE:
435 {
436 int i;
437 SV *async_sv = SvRV (self);
438 async_t *async = SvASYNC_nrv (async_sv);
439
440 for (i = AvFILLp (asyncs); i >= 0; --i)
441 if (AvARRAY (asyncs)[i] == async_sv)
442 {
443 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
444 av_pop (asyncs);
445 goto found;
446 }
447
448 if (!PL_dirty)
449 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
450
451 found:
452
453 if (async->signum)
454 setsig (async->signum, SIG_DFL);
455
456 if (!async->fh_r && async->ep.len)
457 s_epipe_destroy (&async->ep);
458
459 SvREFCNT_dec (async->fh_r);
460 SvREFCNT_dec (async->fh_w);
461 SvREFCNT_dec (async->cb);
462 SvREFCNT_dec (async->value);
463
464 Safefree (async);
465 }
466
467 SV *
468 sig2num (SV *signame_or_number)
469 ALIAS:
470 sig2num = 0
471 sig2name = 1
472 PROTOTYPE: $
473 CODE:
474 {
475 int signum = s_signum (signame_or_number);
476
477 if (signum < 0)
478 RETVAL = &PL_sv_undef;
479 else if (ix)
480 RETVAL = newSVpv (PL_sig_name [signum], 0);
481 else
482 RETVAL = newSViv (signum);
483 }
484 OUTPUT:
485 RETVAL
486
487 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
488
489 void
490 new (const char *klass)
491 PPCODE:
492 {
493 s_epipe *epp;
494
495 Newz (0, epp, 1, s_epipe);
496 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
497
498 if (s_epipe_new (epp) < 0)
499 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
500 }
501
502 void
503 filenos (s_epipe *epp)
504 PPCODE:
505 EXTEND (SP, 2);
506 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
507 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
508
509 int
510 fileno (s_epipe *epp)
511 ALIAS:
512 fileno = 0
513 fileno_r = 0
514 fileno_w = 1
515 CODE:
516 RETVAL = epp->fd [ix];
517 OUTPUT:
518 RETVAL
519
520 int
521 type (s_epipe *epp)
522 CODE:
523 RETVAL = epp->len;
524 OUTPUT:
525 RETVAL
526
527 void
528 s_epipe_signal (s_epipe *epp)
529
530 void
531 s_epipe_drain (s_epipe *epp)
532
533 void
534 signal_func (s_epipe *epp)
535 ALIAS:
536 drain_func = 1
537 PPCODE:
538 EXTEND (SP, 2);
539 PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
540 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
541
542 void
543 s_epipe_wait (s_epipe *epp)
544
545 void
546 DESTROY (s_epipe *epp)
547 CODE:
548 s_epipe_destroy (epp);
549