ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.8 by root, Tue Jul 14 19:29:26 2009 UTC vs.
Revision 1.10 by root, Fri Jul 17 03:32:29 2009 UTC

29typedef struct { 29typedef struct {
30 SV *cb; 30 SV *cb;
31 void (*c_cb)(pTHX_ void *c_arg, int value); 31 void (*c_cb)(pTHX_ void *c_arg, int value);
32 void *c_arg; 32 void *c_arg;
33 SV *fh_r, *fh_w; 33 SV *fh_r, *fh_w;
34 SV *value;
34 int signum; 35 int signum;
35 volatile int blocked; 36 volatile int blocked;
36 37
37 int fd_r; 38 s_epipe ep;
38 volatile int fd_w;
39 int fd_wlen; 39 int fd_wlen;
40 atomic_t fd_enable; 40 atomic_t fd_enable;
41 atomic_t value;
42 atomic_t pending; 41 atomic_t pending;
42 volatile IV *valuep;
43} async_t; 43} async_t;
44 44
45static AV *asyncs; 45static AV *asyncs;
46static async_t *sig_async [SIG_SIZE]; 46static async_t *sig_async [SIG_SIZE];
47 47
55 static char pipedata [8]; 55 static char pipedata [8];
56 56
57 async_t *async = (async_t *)signal_arg; 57 async_t *async = (async_t *)signal_arg;
58 int pending = async->pending; 58 int pending = async->pending;
59 59
60 async->value = value; 60 *async->valuep = value ? value : 1;
61 async->pending = 1; 61 async->pending = 1;
62 async_pending = 1; 62 async_pending = 1;
63 psig_pend [9] = 1; 63 psig_pend [9] = 1;
64 *sig_pending = 1; 64 *sig_pending = 1;
65 65
66 { 66 if (!pending && async->fd_enable && async->ep.len)
67 int fd_w = async->fd_w; 67 s_epipe_signal (&async->ep);
68 int fd_enable = async->fd_enable;
69
70 if (!pending && fd_w >= 0 && fd_enable)
71 if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
72 /* on EINVAL we assume it's an eventfd */
73 write (fd_w, pipedata, (async->fd_wlen = 8));
74 }
75} 68}
76 69
77static void 70static void
78handle_async (async_t *async) 71handle_async (async_t *async)
79{ 72{
80 int old_errno = errno; 73 int old_errno = errno;
81 int value = async->value; 74 int value = *async->valuep;
82 75
76 *async->valuep = 0;
83 async->pending = 0; 77 async->pending = 0;
84 78
85 /* drain pipe */ 79 /* drain pipe */
86 if (async->fd_r >= 0 && async->fd_enable) 80 if (async->fd_enable && async->ep.len)
87 { 81 s_epipe_drain (&async->ep);
88 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
89
90 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
91 ;
92 }
93 82
94 if (async->c_cb) 83 if (async->c_cb)
95 { 84 {
96 dTHX; 85 dTHX;
97 async->c_cb (aTHX_ async->c_arg, value); 86 async->c_cb (aTHX_ async->c_arg, value);
208 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 197 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
209 198
210PROTOTYPES: DISABLE 199PROTOTYPES: DISABLE
211 200
212void 201void
213_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) 202_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
214 PPCODE: 203 PPCODE:
215{ 204{
216 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0; 205 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
217 int fd_r = SvOK (fh_r) ? s_fileno_croak (fh_r, 0) : -1;
218 int fd_w = SvOK (fh_w) ? s_fileno_croak (fh_w, 1) : -1;
219 async_t *async; 206 async_t *async;
220 207
221 Newz (0, async, 1, async_t); 208 Newz (0, async, 1, async_t);
222 209
223 XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 210 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
211 /* TODO: need to bless right now to ensure deallocation */
224 av_push (asyncs, TOPs); 212 av_push (asyncs, TOPs);
225 213
226 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 214 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
227 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 215 if (SvOK (fh_r) || SvOK (fh_w))
216 {
217 int fd_r = s_fileno_croak (fh_r, 0);
218 int fd_w = s_fileno_croak (fh_w, 1);
219
220 async->fh_r = newSVsv (fh_r);
221 async->fh_w = newSVsv (fh_w);
222 async->ep.fd [0] = fd_r;
223 async->ep.fd [1] = fd_w;
228 async->fd_wlen = 1; 224 async->ep.len = 1;
229 async->fd_enable = 1; 225 async->fd_enable = 1;
226 }
227
228 async->value = SvROK (pvalue)
229 ? SvREFCNT_inc_NN (SvRV (pvalue))
230 : NEWSV (0, 0);
231
232 sv_setiv (async->value, 0);
233 SvIOK_only (async->value); /* just to be sure */
234 SvREADONLY_on (async->value);
235
236 async->valuep = &(SvIVX (async->value));
237
230 async->cb = cv; 238 async->cb = cv;
231 async->c_cb = c_cb; 239 async->c_cb = c_cb;
232 async->c_arg = c_arg; 240 async->c_arg = c_arg;
233 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0; 241 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
234 242
256 PPCODE: 264 PPCODE:
257 EXTEND (SP, 2); 265 EXTEND (SP, 2);
258 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 266 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
259 PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 267 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
260 268
269IV
270c_var (async_t *async)
271 CODE:
272 RETVAL = PTR2IV (async->valuep);
273 OUTPUT:
274 RETVAL
275
261void 276void
262signal (async_t *async, int value = 0) 277signal (async_t *async, int value = 1)
263 CODE: 278 CODE:
264 async_signal (async, value); 279 async_signal (async, value);
265 280
266void 281void
267block (async_t *async) 282block (async_t *async)
291 ALIAS: 306 ALIAS:
292 pipe_enable = 1 307 pipe_enable = 1
293 pipe_disable = 0 308 pipe_disable = 0
294 CODE: 309 CODE:
295 async->fd_enable = ix; 310 async->fd_enable = ix;
311
312int
313pipe_fileno (async_t *async)
314 CODE:
315 if (!async->ep.len)
316 {
317 int res;
318
319 /*block (async);*//*TODO*/
320 res = s_epipe_new (&async->ep);
321 async->fd_enable = 1;
322 /*unblock (async);*//*TODO*/
323
324 if (res < 0)
325 croak ("Async::Interrupt: unable to initialize event pipe");
326 }
327
328 RETVAL = async->ep.fd [0];
329 OUTPUT:
330 RETVAL
331
332
333void
334post_fork (async_t *async)
335 CODE:
336 if (async->ep.len)
337 {
338 int res;
339
340 /*block (async);*//*TODO*/
341 res = s_epipe_renew (&async->ep);
342 /*unblock (async);*//*TODO*/
343
344 if (res < 0)
345 croak ("Async::Interrupt: unable to initialize event pipe after fork");
346 }
296 347
297void 348void
298DESTROY (SV *self) 349DESTROY (SV *self)
299 CODE: 350 CODE:
300{ 351{
328 sigaction (async->signum, &sa, 0); 379 sigaction (async->signum, &sa, 0);
329 } 380 }
330#endif 381#endif
331 } 382 }
332 383
384 if (!async->fh_r && async->ep.len)
385 s_epipe_destroy (&async->ep);
386
333 SvREFCNT_dec (async->fh_r); 387 SvREFCNT_dec (async->fh_r);
334 SvREFCNT_dec (async->fh_w); 388 SvREFCNT_dec (async->fh_w);
335 SvREFCNT_dec (async->cb); 389 SvREFCNT_dec (async->cb);
390 SvREFCNT_dec (async->value);
336 391
337 Safefree (async); 392 Safefree (async);
338} 393}
339 394

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines