ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.8 by root, Tue Jul 14 19:29:26 2009 UTC vs.
Revision 1.12 by root, Sat Jul 18 05:14:19 2009 UTC

29typedef struct { 29typedef struct {
30 SV *cb; 30 SV *cb;
31 void (*c_cb)(pTHX_ void *c_arg, int value); 31 void (*c_cb)(pTHX_ void *c_arg, int value);
32 void *c_arg; 32 void *c_arg;
33 SV *fh_r, *fh_w; 33 SV *fh_r, *fh_w;
34 SV *value;
34 int signum; 35 int signum;
36 int autodrain;
35 volatile int blocked; 37 volatile int blocked;
36 38
37 int fd_r; 39 s_epipe ep;
38 volatile int fd_w;
39 int fd_wlen; 40 int fd_wlen;
40 atomic_t fd_enable; 41 atomic_t fd_enable;
41 atomic_t value;
42 atomic_t pending; 42 atomic_t pending;
43 volatile IV *valuep;
43} async_t; 44} async_t;
44 45
45static AV *asyncs; 46static AV *asyncs;
46static async_t *sig_async [SIG_SIZE]; 47static async_t *sig_async [SIG_SIZE];
47 48
55 static char pipedata [8]; 56 static char pipedata [8];
56 57
57 async_t *async = (async_t *)signal_arg; 58 async_t *async = (async_t *)signal_arg;
58 int pending = async->pending; 59 int pending = async->pending;
59 60
60 async->value = value; 61 *async->valuep = value ? value : 1;
61 async->pending = 1; 62 async->pending = 1;
62 async_pending = 1; 63 async_pending = 1;
63 psig_pend [9] = 1; 64 psig_pend [9] = 1;
64 *sig_pending = 1; 65 *sig_pending = 1;
65 66
66 { 67 if (!pending && async->fd_enable && async->ep.len)
67 int fd_w = async->fd_w; 68 s_epipe_signal (&async->ep);
68 int fd_enable = async->fd_enable;
69
70 if (!pending && fd_w >= 0 && fd_enable)
71 if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
72 /* on EINVAL we assume it's an eventfd */
73 write (fd_w, pipedata, (async->fd_wlen = 8));
74 }
75} 69}
76 70
77static void 71static void
78handle_async (async_t *async) 72handle_async (async_t *async)
79{ 73{
80 int old_errno = errno; 74 int old_errno = errno;
81 int value = async->value; 75 int value = *async->valuep;
82 76
77 *async->valuep = 0;
83 async->pending = 0; 78 async->pending = 0;
84 79
85 /* drain pipe */ 80 /* drain pipe */
86 if (async->fd_r >= 0 && async->fd_enable) 81 if (async->fd_enable && async->ep.len && async->autodrain)
87 { 82 s_epipe_drain (&async->ep);
88 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
89
90 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
91 ;
92 }
93 83
94 if (async->c_cb) 84 if (async->c_cb)
95 { 85 {
96 dTHX; 86 dTHX;
97 async->c_cb (aTHX_ async->c_arg, value); 87 async->c_cb (aTHX_ async->c_arg, value);
146 136
147 async_pending = 0; 137 async_pending = 0;
148 138
149 for (i = AvFILLp (asyncs); i >= 0; --i) 139 for (i = AvFILLp (asyncs); i >= 0; --i)
150 { 140 {
141 SV *async_sv = AvARRAY (asyncs)[i];
151 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]); 142 async_t *async = SvASYNC_nrv (async_sv);
152 143
153 if (async->pending && !async->blocked) 144 if (async->pending && !async->blocked)
145 {
146 /* temporarily keep a refcount */
147 SvREFCNT_inc (async_sv);
154 handle_async (async); 148 handle_async (async);
149 SvREFCNT_dec (async_sv);
150
151 /* the handler could have deleted any number of asyncs */
152 if (i > AvFILLp (asyncs))
153 i = AvFILLp (asyncs);
154 }
155 } 155 }
156} 156}
157 157
158#if HAS_SA_SIGINFO 158#if HAS_SA_SIGINFO
159static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg) 159static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
208 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 208 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
209 209
210PROTOTYPES: DISABLE 210PROTOTYPES: DISABLE
211 211
212void 212void
213_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) 213_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
214 PPCODE: 214 PPCODE:
215{ 215{
216 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0; 216 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
217 int fd_r = SvOK (fh_r) ? s_fileno_croak (fh_r, 0) : -1;
218 int fd_w = SvOK (fh_w) ? s_fileno_croak (fh_w, 1) : -1;
219 async_t *async; 217 async_t *async;
220 218
221 Newz (0, async, 1, async_t); 219 Newz (0, async, 1, async_t);
222 220
223 XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 221 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
222 /* TODO: need to bless right now to ensure deallocation */
224 av_push (asyncs, TOPs); 223 av_push (asyncs, TOPs);
225 224
226 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 225 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
227 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 226 if (SvOK (fh_r) || SvOK (fh_w))
227 {
228 int fd_r = s_fileno_croak (fh_r, 0);
229 int fd_w = s_fileno_croak (fh_w, 1);
230
231 async->fh_r = newSVsv (fh_r);
232 async->fh_w = newSVsv (fh_w);
233 async->ep.fd [0] = fd_r;
234 async->ep.fd [1] = fd_w;
228 async->fd_wlen = 1; 235 async->ep.len = 1;
229 async->fd_enable = 1; 236 async->fd_enable = 1;
237 }
238
239 async->value = SvROK (pvalue)
240 ? SvREFCNT_inc_NN (SvRV (pvalue))
241 : NEWSV (0, 0);
242
243 sv_setiv (async->value, 0);
244 SvIOK_only (async->value); /* just to be sure */
245 SvREADONLY_on (async->value);
246
247 async->valuep = &(SvIVX (async->value));
248
249 async->autodrain = 1;
230 async->cb = cv; 250 async->cb = cv;
231 async->c_cb = c_cb; 251 async->c_cb = c_cb;
232 async->c_arg = c_arg; 252 async->c_arg = c_arg;
233 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0; 253 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
234 254
240 sig_async [async->signum] = async; 260 sig_async [async->signum] = async;
241#if _WIN32 261#if _WIN32
242 signal (async->signum, async_sigsend); 262 signal (async->signum, async_sigsend);
243#else 263#else
244 { 264 {
245 struct sigaction sa = { }; 265 struct sigaction sa;
246 sa.sa_handler = async_sigsend; 266 sa.sa_handler = async_sigsend;
247 sigfillset (&sa.sa_mask); 267 sigfillset (&sa.sa_mask);
268 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
248 sigaction (async->signum, &sa, 0); 269 sigaction (async->signum, &sa, 0);
249 } 270 }
250#endif 271#endif
251 } 272 }
252} 273}
256 PPCODE: 277 PPCODE:
257 EXTEND (SP, 2); 278 EXTEND (SP, 2);
258 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 279 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
259 PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 280 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
260 281
282IV
283c_var (async_t *async)
284 CODE:
285 RETVAL = PTR2IV (async->valuep);
286 OUTPUT:
287 RETVAL
288
261void 289void
262signal (async_t *async, int value = 0) 290signal (async_t *async, int value = 1)
263 CODE: 291 CODE:
264 async_signal (async, value); 292 async_signal (async, value);
265 293
266void 294void
267block (async_t *async) 295block (async_t *async)
292 pipe_enable = 1 320 pipe_enable = 1
293 pipe_disable = 0 321 pipe_disable = 0
294 CODE: 322 CODE:
295 async->fd_enable = ix; 323 async->fd_enable = ix;
296 324
325int
326pipe_fileno (async_t *async)
327 CODE:
328 if (!async->ep.len)
329 {
330 int res;
331
332 /*block (async);*//*TODO*/
333 res = s_epipe_new (&async->ep);
334 async->fd_enable = 1;
335 /*unblock (async);*//*TODO*/
336
337 if (res < 0)
338 croak ("Async::Interrupt: unable to initialize event pipe");
339 }
340
341 RETVAL = async->ep.fd [0];
342 OUTPUT:
343 RETVAL
344
345int
346pipe_autodrain (async_t *async, int enable = -1)
347 CODE:
348 RETVAL = async->autodrain;
349 if (enable >= 0)
350 async->autodrain = enable;
351 OUTPUT:
352 RETVAL
353
354void
355post_fork (async_t *async)
356 CODE:
357 if (async->ep.len)
358 {
359 int res;
360
361 /*block (async);*//*TODO*/
362 res = s_epipe_renew (&async->ep);
363 /*unblock (async);*//*TODO*/
364
365 if (res < 0)
366 croak ("Async::Interrupt: unable to initialize event pipe after fork");
367 }
368
297void 369void
298DESTROY (SV *self) 370DESTROY (SV *self)
299 CODE: 371 CODE:
300{ 372{
301 int i; 373 int i;
303 async_t *async = SvASYNC_nrv (async_sv); 375 async_t *async = SvASYNC_nrv (async_sv);
304 376
305 for (i = AvFILLp (asyncs); i >= 0; --i) 377 for (i = AvFILLp (asyncs); i >= 0; --i)
306 if (AvARRAY (asyncs)[i] == async_sv) 378 if (AvARRAY (asyncs)[i] == async_sv)
307 { 379 {
308 if (i < AvFILLp (asyncs))
309 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; 380 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
310 381 av_pop (asyncs);
311 assert (av_pop (asyncs) == async_sv);
312 goto found; 382 goto found;
313 } 383 }
314 384
315 if (!PL_dirty) 385 if (!PL_dirty)
316 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); 386 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
321 { 391 {
322#if _WIN32 392#if _WIN32
323 signal (async->signum, SIG_DFL); 393 signal (async->signum, SIG_DFL);
324#else 394#else
325 { 395 {
326 struct sigaction sa = { }; 396 struct sigaction sa;
327 sa.sa_handler = SIG_DFL; 397 sa.sa_handler = SIG_DFL;
398 sigemptyset (&sa.sa_mask);
399 sa.sa_flags = 0;
328 sigaction (async->signum, &sa, 0); 400 sigaction (async->signum, &sa, 0);
329 } 401 }
330#endif 402#endif
331 } 403 }
332 404
405 if (!async->fh_r && async->ep.len)
406 s_epipe_destroy (&async->ep);
407
333 SvREFCNT_dec (async->fh_r); 408 SvREFCNT_dec (async->fh_r);
334 SvREFCNT_dec (async->fh_w); 409 SvREFCNT_dec (async->fh_w);
335 SvREFCNT_dec (async->cb); 410 SvREFCNT_dec (async->cb);
411 SvREFCNT_dec (async->value);
336 412
337 Safefree (async); 413 Safefree (async);
338} 414}
339 415
416MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
417
418void
419new (const char *klass)
420 PPCODE:
421{
422 s_epipe *epp;
423 SV *self;
424
425 Newz (0, epp, 1, s_epipe);
426 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
427
428 if (s_epipe_new (epp) < 0)
429 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
430}
431
432void
433filenos (s_epipe *epp)
434 PPCODE:
435 EXTEND (SP, 2);
436 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
437 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
438
439int
440fileno (s_epipe *epp)
441 ALIAS:
442 fileno = 0
443 fileno_r = 0
444 fileno_w = 1
445 CODE:
446 RETVAL = epp->fd [ix];
447 OUTPUT:
448 RETVAL
449
450int
451type (s_epipe *epp)
452 CODE:
453 RETVAL = epp->len;
454 OUTPUT:
455 RETVAL
456
457void
458s_epipe_signal (s_epipe *epp)
459
460void
461s_epipe_drain (s_epipe *epp)
462
463void
464DESTROY (s_epipe *epp)
465 CODE:
466 s_epipe_destroy (epp);
467

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines