ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.7 by root, Tue Jul 14 14:18:10 2009 UTC vs.
Revision 1.12 by root, Sat Jul 18 05:14:19 2009 UTC

1#include "EXTERN.h" 1#include "EXTERN.h"
2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4
5#include "schmorp.h"
4 6
5typedef volatile sig_atomic_t atomic_t; 7typedef volatile sig_atomic_t atomic_t;
6 8
7static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ 9static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8static Sighandler_t old_sighandler; 10static Sighandler_t old_sighandler;
21#if !PERL_VERSION_ATLEAST(5,10,0) 23#if !PERL_VERSION_ATLEAST(5,10,0)
22# undef HAS_SA_SIGINFO 24# undef HAS_SA_SIGINFO
23#endif 25#endif
24 26
25/*****************************************************************************/ 27/*****************************************************************************/
26/* support stuff, copied from EV.xs mostly */
27
28static int
29extract_fd (SV *fh, int wr)
30{
31 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
32
33 if (fd < 0)
34 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
35
36 return fd;
37}
38
39static SV *
40get_cb (SV *cb_sv)
41{
42 HV *st;
43 GV *gvp;
44 CV *cv;
45
46 if (!SvOK (cb_sv))
47 return 0;
48
49 cv = sv_2cv (cb_sv, &st, &gvp, 0);
50
51 if (!cv)
52 croak ("Async::Interrupt callback must be undef or a CODE reference");
53
54 return (SV *)cv;
55}
56
57#ifndef SIG_SIZE
58/* kudos to Slaven Rezic for the idea */
59static char sig_size [] = { SIG_NUM };
60# define SIG_SIZE (sizeof (sig_size) + 1)
61#endif
62
63static int
64sv_signum (SV *sig)
65{
66 int signum;
67
68 SvGETMAGIC (sig);
69
70 for (signum = 1; signum < SIG_SIZE; ++signum)
71 if (strEQ (SvPV_nolen (sig), PL_sig_name [signum]))
72 return signum;
73
74 signum = SvIV (sig);
75
76 if (signum > 0 && signum < SIG_SIZE)
77 return signum;
78
79 return -1;
80}
81
82/*****************************************************************************/
83 28
84typedef struct { 29typedef struct {
85 SV *cb; 30 SV *cb;
86 void (*c_cb)(pTHX_ void *c_arg, int value); 31 void (*c_cb)(pTHX_ void *c_arg, int value);
87 void *c_arg; 32 void *c_arg;
88 SV *fh_r, *fh_w; 33 SV *fh_r, *fh_w;
34 SV *value;
89 int signum; 35 int signum;
36 int autodrain;
90 volatile int blocked; 37 volatile int blocked;
91 38
92 int fd_r; 39 s_epipe ep;
93 volatile int fd_w;
94 int fd_wlen; 40 int fd_wlen;
95 atomic_t fd_enable; 41 atomic_t fd_enable;
96 atomic_t value;
97 atomic_t pending; 42 atomic_t pending;
43 volatile IV *valuep;
98} async_t; 44} async_t;
99 45
100static AV *asyncs; 46static AV *asyncs;
101static async_t *sig_async [SIG_SIZE]; 47static async_t *sig_async [SIG_SIZE];
102 48
110 static char pipedata [8]; 56 static char pipedata [8];
111 57
112 async_t *async = (async_t *)signal_arg; 58 async_t *async = (async_t *)signal_arg;
113 int pending = async->pending; 59 int pending = async->pending;
114 60
115 async->value = value; 61 *async->valuep = value ? value : 1;
116 async->pending = 1; 62 async->pending = 1;
117 async_pending = 1; 63 async_pending = 1;
118 psig_pend [9] = 1; 64 psig_pend [9] = 1;
119 *sig_pending = 1; 65 *sig_pending = 1;
120 66
121 { 67 if (!pending && async->fd_enable && async->ep.len)
122 int fd_w = async->fd_w; 68 s_epipe_signal (&async->ep);
123 int fd_enable = async->fd_enable;
124
125 if (!pending && fd_w >= 0 && fd_enable)
126 if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
127 /* on EINVAL we assume it's an eventfd */
128 write (fd_w, pipedata, (async->fd_wlen = 8));
129 }
130} 69}
131 70
132static void 71static void
133handle_async (async_t *async) 72handle_async (async_t *async)
134{ 73{
135 int old_errno = errno; 74 int old_errno = errno;
136 int value = async->value; 75 int value = *async->valuep;
137 76
77 *async->valuep = 0;
138 async->pending = 0; 78 async->pending = 0;
139 79
140 /* drain pipe */ 80 /* drain pipe */
141 if (async->fd_r >= 0 && async->fd_enable) 81 if (async->fd_enable && async->ep.len && async->autodrain)
142 { 82 s_epipe_drain (&async->ep);
143 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
144
145 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
146 ;
147 }
148 83
149 if (async->c_cb) 84 if (async->c_cb)
150 { 85 {
151 dTHX; 86 dTHX;
152 async->c_cb (aTHX_ async->c_arg, value); 87 async->c_cb (aTHX_ async->c_arg, value);
201 136
202 async_pending = 0; 137 async_pending = 0;
203 138
204 for (i = AvFILLp (asyncs); i >= 0; --i) 139 for (i = AvFILLp (asyncs); i >= 0; --i)
205 { 140 {
141 SV *async_sv = AvARRAY (asyncs)[i];
206 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]); 142 async_t *async = SvASYNC_nrv (async_sv);
207 143
208 if (async->pending && !async->blocked) 144 if (async->pending && !async->blocked)
145 {
146 /* temporarily keep a refcount */
147 SvREFCNT_inc (async_sv);
209 handle_async (async); 148 handle_async (async);
149 SvREFCNT_dec (async_sv);
150
151 /* the handler could have deleted any number of asyncs */
152 if (i > AvFILLp (asyncs))
153 i = AvFILLp (asyncs);
154 }
210 } 155 }
211} 156}
212 157
213#if HAS_SA_SIGINFO 158#if HAS_SA_SIGINFO
214static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg) 159static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
263 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 208 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
264 209
265PROTOTYPES: DISABLE 210PROTOTYPES: DISABLE
266 211
267void 212void
268_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) 213_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
269 PPCODE: 214 PPCODE:
270{ 215{
271 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; 216 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
272 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
273 int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
274 async_t *async; 217 async_t *async;
275 218
276 Newz (0, async, 1, async_t); 219 Newz (0, async, 1, async_t);
277 220
278 XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 221 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
222 /* TODO: need to bless right now to ensure deallocation */
279 av_push (asyncs, TOPs); 223 av_push (asyncs, TOPs);
280 224
281 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 225 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
282 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 226 if (SvOK (fh_r) || SvOK (fh_w))
227 {
228 int fd_r = s_fileno_croak (fh_r, 0);
229 int fd_w = s_fileno_croak (fh_w, 1);
230
231 async->fh_r = newSVsv (fh_r);
232 async->fh_w = newSVsv (fh_w);
233 async->ep.fd [0] = fd_r;
234 async->ep.fd [1] = fd_w;
283 async->fd_wlen = 1; 235 async->ep.len = 1;
284 async->fd_enable = 1; 236 async->fd_enable = 1;
237 }
238
239 async->value = SvROK (pvalue)
240 ? SvREFCNT_inc_NN (SvRV (pvalue))
241 : NEWSV (0, 0);
242
243 sv_setiv (async->value, 0);
244 SvIOK_only (async->value); /* just to be sure */
245 SvREADONLY_on (async->value);
246
247 async->valuep = &(SvIVX (async->value));
248
249 async->autodrain = 1;
285 async->cb = cv; 250 async->cb = cv;
286 async->c_cb = c_cb; 251 async->c_cb = c_cb;
287 async->c_arg = c_arg; 252 async->c_arg = c_arg;
288 SvGETMAGIC (signl);
289 async->signum = SvOK (signl) ? sv_signum (signl) : 0; 253 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
290 254
291 if (async->signum) 255 if (async->signum)
292 { 256 {
293 if (async->signum < 0) 257 if (async->signum < 0)
294 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl)); 258 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
296 sig_async [async->signum] = async; 260 sig_async [async->signum] = async;
297#if _WIN32 261#if _WIN32
298 signal (async->signum, async_sigsend); 262 signal (async->signum, async_sigsend);
299#else 263#else
300 { 264 {
301 struct sigaction sa = { }; 265 struct sigaction sa;
302 sa.sa_handler = async_sigsend; 266 sa.sa_handler = async_sigsend;
303 sigfillset (&sa.sa_mask); 267 sigfillset (&sa.sa_mask);
268 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
304 sigaction (async->signum, &sa, 0); 269 sigaction (async->signum, &sa, 0);
305 } 270 }
306#endif 271#endif
307 } 272 }
308} 273}
312 PPCODE: 277 PPCODE:
313 EXTEND (SP, 2); 278 EXTEND (SP, 2);
314 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 279 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
315 PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 280 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
316 281
282IV
283c_var (async_t *async)
284 CODE:
285 RETVAL = PTR2IV (async->valuep);
286 OUTPUT:
287 RETVAL
288
317void 289void
318signal (async_t *async, int value = 0) 290signal (async_t *async, int value = 1)
319 CODE: 291 CODE:
320 async_signal (async, value); 292 async_signal (async, value);
321 293
322void 294void
323block (async_t *async) 295block (async_t *async)
348 pipe_enable = 1 320 pipe_enable = 1
349 pipe_disable = 0 321 pipe_disable = 0
350 CODE: 322 CODE:
351 async->fd_enable = ix; 323 async->fd_enable = ix;
352 324
325int
326pipe_fileno (async_t *async)
327 CODE:
328 if (!async->ep.len)
329 {
330 int res;
331
332 /*block (async);*//*TODO*/
333 res = s_epipe_new (&async->ep);
334 async->fd_enable = 1;
335 /*unblock (async);*//*TODO*/
336
337 if (res < 0)
338 croak ("Async::Interrupt: unable to initialize event pipe");
339 }
340
341 RETVAL = async->ep.fd [0];
342 OUTPUT:
343 RETVAL
344
345int
346pipe_autodrain (async_t *async, int enable = -1)
347 CODE:
348 RETVAL = async->autodrain;
349 if (enable >= 0)
350 async->autodrain = enable;
351 OUTPUT:
352 RETVAL
353
354void
355post_fork (async_t *async)
356 CODE:
357 if (async->ep.len)
358 {
359 int res;
360
361 /*block (async);*//*TODO*/
362 res = s_epipe_renew (&async->ep);
363 /*unblock (async);*//*TODO*/
364
365 if (res < 0)
366 croak ("Async::Interrupt: unable to initialize event pipe after fork");
367 }
368
353void 369void
354DESTROY (SV *self) 370DESTROY (SV *self)
355 CODE: 371 CODE:
356{ 372{
357 int i; 373 int i;
359 async_t *async = SvASYNC_nrv (async_sv); 375 async_t *async = SvASYNC_nrv (async_sv);
360 376
361 for (i = AvFILLp (asyncs); i >= 0; --i) 377 for (i = AvFILLp (asyncs); i >= 0; --i)
362 if (AvARRAY (asyncs)[i] == async_sv) 378 if (AvARRAY (asyncs)[i] == async_sv)
363 { 379 {
364 if (i < AvFILLp (asyncs))
365 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; 380 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
366 381 av_pop (asyncs);
367 assert (av_pop (asyncs) == async_sv);
368 goto found; 382 goto found;
369 } 383 }
370 384
371 if (!PL_dirty) 385 if (!PL_dirty)
372 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); 386 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
377 { 391 {
378#if _WIN32 392#if _WIN32
379 signal (async->signum, SIG_DFL); 393 signal (async->signum, SIG_DFL);
380#else 394#else
381 { 395 {
382 struct sigaction sa = { }; 396 struct sigaction sa;
383 sa.sa_handler = SIG_DFL; 397 sa.sa_handler = SIG_DFL;
398 sigemptyset (&sa.sa_mask);
399 sa.sa_flags = 0;
384 sigaction (async->signum, &sa, 0); 400 sigaction (async->signum, &sa, 0);
385 } 401 }
386#endif 402#endif
387 } 403 }
388 404
405 if (!async->fh_r && async->ep.len)
406 s_epipe_destroy (&async->ep);
407
389 SvREFCNT_dec (async->fh_r); 408 SvREFCNT_dec (async->fh_r);
390 SvREFCNT_dec (async->fh_w); 409 SvREFCNT_dec (async->fh_w);
391 SvREFCNT_dec (async->cb); 410 SvREFCNT_dec (async->cb);
411 SvREFCNT_dec (async->value);
392 412
393 Safefree (async); 413 Safefree (async);
394} 414}
395 415
416MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
417
418void
419new (const char *klass)
420 PPCODE:
421{
422 s_epipe *epp;
423 SV *self;
424
425 Newz (0, epp, 1, s_epipe);
426 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
427
428 if (s_epipe_new (epp) < 0)
429 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
430}
431
432void
433filenos (s_epipe *epp)
434 PPCODE:
435 EXTEND (SP, 2);
436 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
437 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
438
439int
440fileno (s_epipe *epp)
441 ALIAS:
442 fileno = 0
443 fileno_r = 0
444 fileno_w = 1
445 CODE:
446 RETVAL = epp->fd [ix];
447 OUTPUT:
448 RETVAL
449
450int
451type (s_epipe *epp)
452 CODE:
453 RETVAL = epp->len;
454 OUTPUT:
455 RETVAL
456
457void
458s_epipe_signal (s_epipe *epp)
459
460void
461s_epipe_drain (s_epipe *epp)
462
463void
464DESTROY (s_epipe *epp)
465 CODE:
466 s_epipe_destroy (epp);
467

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines