ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.15
Committed: Thu Jul 30 03:59:47 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-1_0, rel-1_01
Changes since 1.14: +10 -0 lines
Log Message:
1.0

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "schmorp.h"
6
7 typedef volatile sig_atomic_t atomic_t;
8
9 static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
10 static Sighandler_t old_sighandler;
11 static atomic_t async_pending;
12
13 #define PERL_VERSION_ATLEAST(a,b,c) \
14 (PERL_REVISION > (a) \
15 || (PERL_REVISION == (a) \
16 && (PERL_VERSION > (b) \
17 || (PERL_VERSION == (b) && PERL_SUBVERSION >= (c)))))
18
19 #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
20 # define HAS_SA_SIGINFO 1
21 #endif
22
23 #if !PERL_VERSION_ATLEAST(5,10,0)
24 # undef HAS_SA_SIGINFO
25 #endif
26
27 /*****************************************************************************/
28
29 typedef struct {
30 SV *cb;
31 void (*c_cb)(pTHX_ void *c_arg, int value);
32 void *c_arg;
33 SV *fh_r, *fh_w;
34 SV *value;
35 int signum;
36 int autodrain;
37 ANY *scope_savestack;
38 volatile int blocked;
39
40 s_epipe ep;
41 int fd_wlen;
42 atomic_t fd_enable;
43 atomic_t pending;
44 volatile IV *valuep;
45 atomic_t hysteresis;
46 } async_t;
47
48 static AV *asyncs;
49 static async_t *sig_async [SIG_SIZE];
50
51 #define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
52 #define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
53
54 static void async_signal (void *signal_arg, int value);
55
56 static void
57 setsig (int signum, void (*handler)(int))
58 {
59 #if _WIN32
60 signal (async->signum, handler);
61 #else
62 struct sigaction sa;
63 sa.sa_handler = handler;
64 sigfillset (&sa.sa_mask);
65 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
66 sigaction (signum, &sa, 0);
67 }
68
69 static void
70 async_sigsend (int signum)
71 {
72 async_signal (sig_async [signum], 0);
73 }
74
75 /* the main workhorse to signal */
76 static void
77 async_signal (void *signal_arg, int value)
78 {
79 static char pipedata [8];
80
81 async_t *async = (async_t *)signal_arg;
82 int pending = async->pending;
83
84 if (async->hysteresis)
85 setsig (async->signum, SIG_IGN);
86
87 *async->valuep = value ? value : 1;
88 async->pending = 1;
89 async_pending = 1;
90 psig_pend [9] = 1;
91 *sig_pending = 1;
92
93 if (!pending && async->fd_enable && async->ep.len)
94 s_epipe_signal (&async->ep);
95 }
96
97 static void
98 handle_async (async_t *async)
99 {
100 int old_errno = errno;
101 int value = *async->valuep;
102
103 *async->valuep = 0;
104 async->pending = 0;
105
106 /* restore signal */
107 if (async->hysteresis)
108 setsig (async->signum, async_sigsend);
109
110 /* drain pipe */
111 if (async->fd_enable && async->ep.len && async->autodrain)
112 s_epipe_drain (&async->ep);
113
114 if (async->c_cb)
115 {
116 dTHX;
117 async->c_cb (aTHX_ async->c_arg, value);
118 }
119
120 if (async->cb)
121 {
122 dSP;
123
124 SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
125 SV *savedie = PL_diehook;
126
127 PL_diehook = 0;
128
129 PUSHSTACKi (PERLSI_SIGNAL);
130
131 PUSHMARK (SP);
132 XPUSHs (sv_2mortal (newSViv (value)));
133 PUTBACK;
134 call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
135
136 if (SvTRUE (ERRSV))
137 {
138 SPAGAIN;
139
140 PUSHMARK (SP);
141 PUTBACK;
142 call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
143
144 sv_setpvn (ERRSV, "", 0);
145 }
146
147 if (saveerr)
148 sv_setsv (ERRSV, saveerr);
149
150 {
151 SV *oldhook = PL_diehook;
152 PL_diehook = savedie;
153 SvREFCNT_dec (oldhook);
154 }
155
156 POPSTACK;
157 }
158
159 errno = old_errno;
160 }
161
162 static void
163 handle_asyncs (void)
164 {
165 int i;
166
167 async_pending = 0;
168
169 for (i = AvFILLp (asyncs); i >= 0; --i)
170 {
171 SV *async_sv = AvARRAY (asyncs)[i];
172 async_t *async = SvASYNC_nrv (async_sv);
173
174 if (async->pending && !async->blocked)
175 {
176 /* temporarily keep a refcount */
177 SvREFCNT_inc (async_sv);
178 handle_async (async);
179 SvREFCNT_dec (async_sv);
180
181 /* the handler could have deleted any number of asyncs */
182 if (i > AvFILLp (asyncs))
183 i = AvFILLp (asyncs);
184 }
185 }
186 }
187
188 #if HAS_SA_SIGINFO
189 static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
190 {
191 if (signum == 9)
192 handle_asyncs ();
193 else
194 old_sighandler (signum, si, sarg);
195 }
196 #else
197 static Signal_t async_sighandler (int signum)
198 {
199 if (signum == 9)
200 handle_asyncs ();
201 else
202 old_sighandler (signum);
203 }
204 #endif
205
206 #define block(async) ++(async)->blocked
207
208 static void
209 unblock (async_t *async)
210 {
211 --async->blocked;
212 if (async->pending && !async->blocked)
213 handle_async (async);
214 }
215
216 static void
217 scope_block_cb (pTHX_ void *async_sv)
218 {
219 async_t *async = SvASYNC_nrv ((SV *)async_sv);
220
221 async->scope_savestack = 0;
222 unblock (async);
223 SvREFCNT_dec (async_sv);
224 }
225
226 static void
227 scope_block (SV *async_sv)
228 {
229 async_t *async = SvASYNC_nrv (async_sv);
230
231 /* as a heuristic, we skip the scope block if we already are blocked */
232 /* and the existing scope block used the same savestack */
233
234 if (!async->scope_savestack || async->scope_savestack != PL_savestack)
235 {
236 async->scope_savestack = PL_savestack;
237 block (async);
238
239 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
240 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
241 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
242 }
243 }
244
245 #endif
246 MODULE = Async::Interrupt PACKAGE = Async::Interrupt
247
248 BOOT:
249 old_sighandler = PL_sighandlerp;
250 PL_sighandlerp = async_sighandler;
251 sig_pending = &PL_sig_pending;
252 psig_pend = PL_psig_pend;
253 asyncs = newAV ();
254 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
255
256 PROTOTYPES: DISABLE
257
258 void
259 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
260 PPCODE:
261 {
262 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
263 async_t *async;
264
265 Newz (0, async, 1, async_t);
266
267 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
268 /* TODO: need to bless right now to ensure deallocation */
269 av_push (asyncs, TOPs);
270
271 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
272 if (SvOK (fh_r) || SvOK (fh_w))
273 {
274 int fd_r = s_fileno_croak (fh_r, 0);
275 int fd_w = s_fileno_croak (fh_w, 1);
276
277 async->fh_r = newSVsv (fh_r);
278 async->fh_w = newSVsv (fh_w);
279 async->ep.fd [0] = fd_r;
280 async->ep.fd [1] = fd_w;
281 async->ep.len = 1;
282 async->fd_enable = 1;
283 }
284
285 async->value = SvROK (pvalue)
286 ? SvREFCNT_inc_NN (SvRV (pvalue))
287 : NEWSV (0, 0);
288
289 sv_setiv (async->value, 0);
290 SvIOK_only (async->value); /* just to be sure */
291 SvREADONLY_on (async->value);
292
293 async->valuep = &(SvIVX (async->value));
294
295 async->autodrain = 1;
296 async->cb = cv;
297 async->c_cb = c_cb;
298 async->c_arg = c_arg;
299 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
300
301 if (async->signum)
302 {
303 if (async->signum < 0)
304 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
305
306 sig_async [async->signum] = async;
307 setsig (async->signum, async_sigsend);
308 }
309 }
310
311 void
312 signal_hysteresis (async_t *async, int enable)
313 CODE:
314 async->hysteresis = enable;
315
316 void
317 signal_func (async_t *async)
318 PPCODE:
319 EXTEND (SP, 2);
320 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
321 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
322
323 void
324 scope_block_func (SV *self)
325 PPCODE:
326 EXTEND (SP, 2);
327 PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
328 PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
329
330 IV
331 c_var (async_t *async)
332 CODE:
333 RETVAL = PTR2IV (async->valuep);
334 OUTPUT:
335 RETVAL
336
337 void
338 signal (async_t *async, int value = 1)
339 CODE:
340 async_signal (async, value);
341
342 void
343 block (async_t *async)
344 CODE:
345 block (async);
346
347 void
348 unblock (async_t *async)
349 CODE:
350 unblock (async);
351
352 void
353 scope_block (SV *self)
354 CODE:
355 scope_block (SvRV (self));
356
357 void
358 pipe_enable (async_t *async)
359 ALIAS:
360 pipe_enable = 1
361 pipe_disable = 0
362 CODE:
363 async->fd_enable = ix;
364
365 int
366 pipe_fileno (async_t *async)
367 CODE:
368 if (!async->ep.len)
369 {
370 int res;
371
372 /*block (async);*//*TODO*/
373 res = s_epipe_new (&async->ep);
374 async->fd_enable = 1;
375 /*unblock (async);*//*TODO*/
376
377 if (res < 0)
378 croak ("Async::Interrupt: unable to initialize event pipe");
379 }
380
381 RETVAL = async->ep.fd [0];
382 OUTPUT:
383 RETVAL
384
385 int
386 pipe_autodrain (async_t *async, int enable = -1)
387 CODE:
388 RETVAL = async->autodrain;
389 if (enable >= 0)
390 async->autodrain = enable;
391 OUTPUT:
392 RETVAL
393
394 void
395 post_fork (async_t *async)
396 CODE:
397 if (async->ep.len)
398 {
399 int res;
400
401 /*block (async);*//*TODO*/
402 res = s_epipe_renew (&async->ep);
403 /*unblock (async);*//*TODO*/
404
405 if (res < 0)
406 croak ("Async::Interrupt: unable to initialize event pipe after fork");
407 }
408
409 void
410 DESTROY (SV *self)
411 CODE:
412 {
413 int i;
414 SV *async_sv = SvRV (self);
415 async_t *async = SvASYNC_nrv (async_sv);
416
417 for (i = AvFILLp (asyncs); i >= 0; --i)
418 if (AvARRAY (asyncs)[i] == async_sv)
419 {
420 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
421 av_pop (asyncs);
422 goto found;
423 }
424
425 if (!PL_dirty)
426 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
427
428 found:
429
430 if (async->signum)
431 setsig (async->signum, SIG_DFL);
432
433 if (!async->fh_r && async->ep.len)
434 s_epipe_destroy (&async->ep);
435
436 SvREFCNT_dec (async->fh_r);
437 SvREFCNT_dec (async->fh_w);
438 SvREFCNT_dec (async->cb);
439 SvREFCNT_dec (async->value);
440
441 Safefree (async);
442 }
443
444 SV *
445 sig2num (SV *signame_or_number)
446 ALIAS:
447 sig2num = 0
448 sig2name = 1
449 CODE:
450 {
451 int signum = s_signum (signame_or_number);
452
453 if (signum < 0)
454 RETVAL = &PL_sv_undef;
455 else if (ix)
456 RETVAL = newSVpv (PL_sig_name [signum], 0);
457 else
458 RETVAL = newSViv (signum);
459 }
460 OUTPUT:
461 RETVAL
462
463 MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
464
465 void
466 new (const char *klass)
467 PPCODE:
468 {
469 s_epipe *epp;
470 SV *self;
471
472 Newz (0, epp, 1, s_epipe);
473 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
474
475 if (s_epipe_new (epp) < 0)
476 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
477 }
478
479 void
480 filenos (s_epipe *epp)
481 PPCODE:
482 EXTEND (SP, 2);
483 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
484 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
485
486 int
487 fileno (s_epipe *epp)
488 ALIAS:
489 fileno = 0
490 fileno_r = 0
491 fileno_w = 1
492 CODE:
493 RETVAL = epp->fd [ix];
494 OUTPUT:
495 RETVAL
496
497 int
498 type (s_epipe *epp)
499 CODE:
500 RETVAL = epp->len;
501 OUTPUT:
502 RETVAL
503
504 void
505 s_epipe_signal (s_epipe *epp)
506
507 void
508 s_epipe_drain (s_epipe *epp)
509
510 void
511 drain_func (s_epipe *epp)
512 PPCODE:
513 EXTEND (SP, 2);
514 PUSHs (sv_2mortal (newSViv (PTR2IV (s_epipe_drain))));
515 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
516
517 void
518 s_epipe_wait (s_epipe *epp)
519
520 void
521 DESTROY (s_epipe *epp)
522 CODE:
523 s_epipe_destroy (epp);
524