ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.7 by root, Tue Jul 14 14:18:10 2009 UTC vs.
Revision 1.16 by root, Tue Sep 1 16:41:29 2009 UTC

1#include "EXTERN.h" 1#include "EXTERN.h"
2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4
5#include "schmorp.h"
4 6
5typedef volatile sig_atomic_t atomic_t; 7typedef volatile sig_atomic_t atomic_t;
6 8
7static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ 9static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8static Sighandler_t old_sighandler; 10static Sighandler_t old_sighandler;
21#if !PERL_VERSION_ATLEAST(5,10,0) 23#if !PERL_VERSION_ATLEAST(5,10,0)
22# undef HAS_SA_SIGINFO 24# undef HAS_SA_SIGINFO
23#endif 25#endif
24 26
25/*****************************************************************************/ 27/*****************************************************************************/
26/* support stuff, copied from EV.xs mostly */
27
28static int
29extract_fd (SV *fh, int wr)
30{
31 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
32
33 if (fd < 0)
34 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
35
36 return fd;
37}
38
39static SV *
40get_cb (SV *cb_sv)
41{
42 HV *st;
43 GV *gvp;
44 CV *cv;
45
46 if (!SvOK (cb_sv))
47 return 0;
48
49 cv = sv_2cv (cb_sv, &st, &gvp, 0);
50
51 if (!cv)
52 croak ("Async::Interrupt callback must be undef or a CODE reference");
53
54 return (SV *)cv;
55}
56
57#ifndef SIG_SIZE
58/* kudos to Slaven Rezic for the idea */
59static char sig_size [] = { SIG_NUM };
60# define SIG_SIZE (sizeof (sig_size) + 1)
61#endif
62
63static int
64sv_signum (SV *sig)
65{
66 int signum;
67
68 SvGETMAGIC (sig);
69
70 for (signum = 1; signum < SIG_SIZE; ++signum)
71 if (strEQ (SvPV_nolen (sig), PL_sig_name [signum]))
72 return signum;
73
74 signum = SvIV (sig);
75
76 if (signum > 0 && signum < SIG_SIZE)
77 return signum;
78
79 return -1;
80}
81
82/*****************************************************************************/
83 28
84typedef struct { 29typedef struct {
85 SV *cb; 30 SV *cb;
86 void (*c_cb)(pTHX_ void *c_arg, int value); 31 void (*c_cb)(pTHX_ void *c_arg, int value);
87 void *c_arg; 32 void *c_arg;
88 SV *fh_r, *fh_w; 33 SV *fh_r, *fh_w;
34 SV *value;
89 int signum; 35 int signum;
36 int autodrain;
37 ANY *scope_savestack;
90 volatile int blocked; 38 volatile int blocked;
91 39
92 int fd_r; 40 s_epipe ep;
93 volatile int fd_w;
94 int fd_wlen; 41 int fd_wlen;
95 atomic_t fd_enable; 42 atomic_t fd_enable;
96 atomic_t value;
97 atomic_t pending; 43 atomic_t pending;
44 volatile IV *valuep;
45 atomic_t hysteresis;
98} async_t; 46} async_t;
99 47
100static AV *asyncs; 48static AV *asyncs;
101static async_t *sig_async [SIG_SIZE]; 49static async_t *sig_async [SIG_SIZE];
102 50
103#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv)) 51#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
104#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv)) 52#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
105 53
54static void async_signal (void *signal_arg, int value);
55
56static void
57setsig (int signum, void (*handler)(int))
58{
59#if _WIN32
60 signal (async->signum, handler);
61#else
62 struct sigaction sa;
63 sa.sa_handler = handler;
64 sigfillset (&sa.sa_mask);
65 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
66 sigaction (signum, &sa, 0);
67}
68
69static void
70async_sigsend (int signum)
71{
72 async_signal (sig_async [signum], 0);
73}
74
106/* the main workhorse to signal */ 75/* the main workhorse to signal */
107static void 76static void
108async_signal (void *signal_arg, int value) 77async_signal (void *signal_arg, int value)
109{ 78{
110 static char pipedata [8]; 79 static char pipedata [8];
111 80
112 async_t *async = (async_t *)signal_arg; 81 async_t *async = (async_t *)signal_arg;
113 int pending = async->pending; 82 int pending = async->pending;
114 83
115 async->value = value; 84 if (async->hysteresis)
85 setsig (async->signum, SIG_IGN);
86
87 *async->valuep = value ? value : 1;
116 async->pending = 1; 88 async->pending = 1;
117 async_pending = 1; 89 async_pending = 1;
118 psig_pend [9] = 1; 90 psig_pend [9] = 1;
119 *sig_pending = 1; 91 *sig_pending = 1;
120 92
121 { 93 if (!pending && async->fd_enable && async->ep.len)
122 int fd_w = async->fd_w; 94 s_epipe_signal (&async->ep);
123 int fd_enable = async->fd_enable;
124
125 if (!pending && fd_w >= 0 && fd_enable)
126 if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
127 /* on EINVAL we assume it's an eventfd */
128 write (fd_w, pipedata, (async->fd_wlen = 8));
129 }
130} 95}
131 96
132static void 97static void
133handle_async (async_t *async) 98handle_async (async_t *async)
134{ 99{
135 int old_errno = errno; 100 int old_errno = errno;
136 int value = async->value; 101 int value = *async->valuep;
137 102
103 *async->valuep = 0;
138 async->pending = 0; 104 async->pending = 0;
139 105
106 /* restore signal */
107 if (async->hysteresis)
108 setsig (async->signum, async_sigsend);
109
140 /* drain pipe */ 110 /* drain pipe */
141 if (async->fd_r >= 0 && async->fd_enable) 111 if (async->fd_enable && async->ep.len && async->autodrain)
142 { 112 s_epipe_drain (&async->ep);
143 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
144
145 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
146 ;
147 }
148 113
149 if (async->c_cb) 114 if (async->c_cb)
150 { 115 {
151 dTHX; 116 dTHX;
152 async->c_cb (aTHX_ async->c_arg, value); 117 async->c_cb (aTHX_ async->c_arg, value);
201 166
202 async_pending = 0; 167 async_pending = 0;
203 168
204 for (i = AvFILLp (asyncs); i >= 0; --i) 169 for (i = AvFILLp (asyncs); i >= 0; --i)
205 { 170 {
171 SV *async_sv = AvARRAY (asyncs)[i];
206 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]); 172 async_t *async = SvASYNC_nrv (async_sv);
207 173
208 if (async->pending && !async->blocked) 174 if (async->pending && !async->blocked)
175 {
176 /* temporarily keep a refcount */
177 SvREFCNT_inc (async_sv);
209 handle_async (async); 178 handle_async (async);
179 SvREFCNT_dec (async_sv);
180
181 /* the handler could have deleted any number of asyncs */
182 if (i > AvFILLp (asyncs))
183 i = AvFILLp (asyncs);
184 }
210 } 185 }
211} 186}
212 187
213#if HAS_SA_SIGINFO 188#if HAS_SA_SIGINFO
214static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg) 189static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
226 else 201 else
227 old_sighandler (signum); 202 old_sighandler (signum);
228} 203}
229#endif 204#endif
230 205
231static void
232async_sigsend (int signum)
233{
234 async_signal (sig_async [signum], 0);
235}
236
237#define block(async) ++(async)->blocked 206#define block(async) ++(async)->blocked
238 207
239static void 208static void
240unblock (async_t *async) 209unblock (async_t *async)
241{ 210{
246 215
247static void 216static void
248scope_block_cb (pTHX_ void *async_sv) 217scope_block_cb (pTHX_ void *async_sv)
249{ 218{
250 async_t *async = SvASYNC_nrv ((SV *)async_sv); 219 async_t *async = SvASYNC_nrv ((SV *)async_sv);
220
221 async->scope_savestack = 0;
251 unblock (async); 222 unblock (async);
252 SvREFCNT_dec (async_sv); 223 SvREFCNT_dec (async_sv);
253} 224}
254 225
226static void
227scope_block (SV *async_sv)
228{
229 async_t *async = SvASYNC_nrv (async_sv);
230
231 /* as a heuristic, we skip the scope block if we already are blocked */
232 /* and the existing scope block used the same savestack */
233
234 if (!async->scope_savestack || async->scope_savestack != PL_savestack)
235 {
236 async->scope_savestack = PL_savestack;
237 block (async);
238
239 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
240 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
241 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
242 }
243}
244
245#endif
255MODULE = Async::Interrupt PACKAGE = Async::Interrupt 246MODULE = Async::Interrupt PACKAGE = Async::Interrupt
256 247
257BOOT: 248BOOT:
258 old_sighandler = PL_sighandlerp; 249 old_sighandler = PL_sighandlerp;
259 PL_sighandlerp = async_sighandler; 250 PL_sighandlerp = async_sighandler;
263 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 254 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
264 255
265PROTOTYPES: DISABLE 256PROTOTYPES: DISABLE
266 257
267void 258void
268_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) 259_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
269 PPCODE: 260 PPCODE:
270{ 261{
271 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; 262 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
272 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
273 int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
274 async_t *async; 263 async_t *async;
275 264
276 Newz (0, async, 1, async_t); 265 Newz (0, async, 1, async_t);
277 266
278 XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 267 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
268 /* TODO: need to bless right now to ensure deallocation */
279 av_push (asyncs, TOPs); 269 av_push (asyncs, TOPs);
280 270
281 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 271 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
282 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 272 if (SvOK (fh_r) || SvOK (fh_w))
273 {
274 int fd_r = s_fileno_croak (fh_r, 0);
275 int fd_w = s_fileno_croak (fh_w, 1);
276
277 async->fh_r = newSVsv (fh_r);
278 async->fh_w = newSVsv (fh_w);
279 async->ep.fd [0] = fd_r;
280 async->ep.fd [1] = fd_w;
283 async->fd_wlen = 1; 281 async->ep.len = 1;
284 async->fd_enable = 1; 282 async->fd_enable = 1;
283 }
284
285 async->value = SvROK (pvalue)
286 ? SvREFCNT_inc_NN (SvRV (pvalue))
287 : NEWSV (0, 0);
288
289 sv_setiv (async->value, 0);
290 SvIOK_only (async->value); /* just to be sure */
291 SvREADONLY_on (async->value);
292
293 async->valuep = &(SvIVX (async->value));
294
295 async->autodrain = 1;
285 async->cb = cv; 296 async->cb = cv;
286 async->c_cb = c_cb; 297 async->c_cb = c_cb;
287 async->c_arg = c_arg; 298 async->c_arg = c_arg;
288 SvGETMAGIC (signl);
289 async->signum = SvOK (signl) ? sv_signum (signl) : 0; 299 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
290 300
291 if (async->signum) 301 if (async->signum)
292 { 302 {
293 if (async->signum < 0) 303 if (async->signum < 0)
294 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl)); 304 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
295 305
296 sig_async [async->signum] = async; 306 sig_async [async->signum] = async;
297#if _WIN32
298 signal (async->signum, async_sigsend); 307 setsig (async->signum, async_sigsend);
299#else
300 {
301 struct sigaction sa = { };
302 sa.sa_handler = async_sigsend;
303 sigfillset (&sa.sa_mask);
304 sigaction (async->signum, &sa, 0);
305 }
306#endif
307 } 308 }
308} 309}
310
311void
312signal_hysteresis (async_t *async, int enable)
313 CODE:
314 async->hysteresis = enable;
309 315
310void 316void
311signal_func (async_t *async) 317signal_func (async_t *async)
312 PPCODE: 318 PPCODE:
313 EXTEND (SP, 2); 319 EXTEND (SP, 2);
314 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 320 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
315 PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 321 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
316 322
317void 323void
324scope_block_func (SV *self)
325 PPCODE:
326 EXTEND (SP, 2);
327 PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
328 PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
329
330IV
331c_var (async_t *async)
332 CODE:
333 RETVAL = PTR2IV (async->valuep);
334 OUTPUT:
335 RETVAL
336
337void
318signal (async_t *async, int value = 0) 338signal (async_t *async, int value = 1)
319 CODE: 339 CODE:
320 async_signal (async, value); 340 async_signal (async, value);
321 341
322void 342void
323block (async_t *async) 343block (async_t *async)
330 unblock (async); 350 unblock (async);
331 351
332void 352void
333scope_block (SV *self) 353scope_block (SV *self)
334 CODE: 354 CODE:
335{ 355 scope_block (SvRV (self));
336 SV *async_sv = SvRV (self);
337 async_t *async = SvASYNC_nrv (async_sv);
338 block (async);
339
340 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
341 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
342 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
343}
344 356
345void 357void
346pipe_enable (async_t *async) 358pipe_enable (async_t *async)
347 ALIAS: 359 ALIAS:
348 pipe_enable = 1 360 pipe_enable = 1
349 pipe_disable = 0 361 pipe_disable = 0
350 CODE: 362 CODE:
351 async->fd_enable = ix; 363 async->fd_enable = ix;
352 364
365int
366pipe_fileno (async_t *async)
367 CODE:
368 if (!async->ep.len)
369 {
370 int res;
371
372 /*block (async);*//*TODO*/
373 res = s_epipe_new (&async->ep);
374 async->fd_enable = 1;
375 /*unblock (async);*//*TODO*/
376
377 if (res < 0)
378 croak ("Async::Interrupt: unable to initialize event pipe");
379 }
380
381 RETVAL = async->ep.fd [0];
382 OUTPUT:
383 RETVAL
384
385int
386pipe_autodrain (async_t *async, int enable = -1)
387 CODE:
388 RETVAL = async->autodrain;
389 if (enable >= 0)
390 async->autodrain = enable;
391 OUTPUT:
392 RETVAL
393
394void
395post_fork (async_t *async)
396 CODE:
397 if (async->ep.len)
398 {
399 int res;
400
401 /*block (async);*//*TODO*/
402 res = s_epipe_renew (&async->ep);
403 /*unblock (async);*//*TODO*/
404
405 if (res < 0)
406 croak ("Async::Interrupt: unable to initialize event pipe after fork");
407 }
408
353void 409void
354DESTROY (SV *self) 410DESTROY (SV *self)
355 CODE: 411 CODE:
356{ 412{
357 int i; 413 int i;
359 async_t *async = SvASYNC_nrv (async_sv); 415 async_t *async = SvASYNC_nrv (async_sv);
360 416
361 for (i = AvFILLp (asyncs); i >= 0; --i) 417 for (i = AvFILLp (asyncs); i >= 0; --i)
362 if (AvARRAY (asyncs)[i] == async_sv) 418 if (AvARRAY (asyncs)[i] == async_sv)
363 { 419 {
364 if (i < AvFILLp (asyncs))
365 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; 420 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
366 421 av_pop (asyncs);
367 assert (av_pop (asyncs) == async_sv);
368 goto found; 422 goto found;
369 } 423 }
370 424
371 if (!PL_dirty) 425 if (!PL_dirty)
372 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); 426 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
373 427
374 found: 428 found:
375 429
376 if (async->signum) 430 if (async->signum)
377 {
378#if _WIN32
379 signal (async->signum, SIG_DFL); 431 setsig (async->signum, SIG_DFL);
380#else 432
381 { 433 if (!async->fh_r && async->ep.len)
382 struct sigaction sa = { }; 434 s_epipe_destroy (&async->ep);
383 sa.sa_handler = SIG_DFL;
384 sigaction (async->signum, &sa, 0);
385 }
386#endif
387 }
388 435
389 SvREFCNT_dec (async->fh_r); 436 SvREFCNT_dec (async->fh_r);
390 SvREFCNT_dec (async->fh_w); 437 SvREFCNT_dec (async->fh_w);
391 SvREFCNT_dec (async->cb); 438 SvREFCNT_dec (async->cb);
439 SvREFCNT_dec (async->value);
392 440
393 Safefree (async); 441 Safefree (async);
394} 442}
395 443
444SV *
445sig2num (SV *signame_or_number)
446 ALIAS:
447 sig2num = 0
448 sig2name = 1
449 PROTOTYPE: $
450 CODE:
451{
452 int signum = s_signum (signame_or_number);
453
454 if (signum < 0)
455 RETVAL = &PL_sv_undef;
456 else if (ix)
457 RETVAL = newSVpv (PL_sig_name [signum], 0);
458 else
459 RETVAL = newSViv (signum);
460}
461 OUTPUT:
462 RETVAL
463
464MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
465
466void
467new (const char *klass)
468 PPCODE:
469{
470 s_epipe *epp;
471 SV *self;
472
473 Newz (0, epp, 1, s_epipe);
474 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
475
476 if (s_epipe_new (epp) < 0)
477 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
478}
479
480void
481filenos (s_epipe *epp)
482 PPCODE:
483 EXTEND (SP, 2);
484 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
485 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
486
487int
488fileno (s_epipe *epp)
489 ALIAS:
490 fileno = 0
491 fileno_r = 0
492 fileno_w = 1
493 CODE:
494 RETVAL = epp->fd [ix];
495 OUTPUT:
496 RETVAL
497
498int
499type (s_epipe *epp)
500 CODE:
501 RETVAL = epp->len;
502 OUTPUT:
503 RETVAL
504
505void
506s_epipe_signal (s_epipe *epp)
507
508void
509s_epipe_drain (s_epipe *epp)
510
511void
512drain_func (s_epipe *epp)
513 PPCODE:
514 EXTEND (SP, 2);
515 PUSHs (sv_2mortal (newSViv (PTR2IV (s_epipe_drain))));
516 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
517
518void
519s_epipe_wait (s_epipe *epp)
520
521void
522DESTROY (s_epipe *epp)
523 CODE:
524 s_epipe_destroy (epp);
525

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines