ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.7 by root, Tue Jul 14 14:18:10 2009 UTC vs.
Revision 1.10 by root, Fri Jul 17 03:32:29 2009 UTC

1#include "EXTERN.h" 1#include "EXTERN.h"
2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4
5#include "schmorp.h"
4 6
5typedef volatile sig_atomic_t atomic_t; 7typedef volatile sig_atomic_t atomic_t;
6 8
7static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ 9static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8static Sighandler_t old_sighandler; 10static Sighandler_t old_sighandler;
21#if !PERL_VERSION_ATLEAST(5,10,0) 23#if !PERL_VERSION_ATLEAST(5,10,0)
22# undef HAS_SA_SIGINFO 24# undef HAS_SA_SIGINFO
23#endif 25#endif
24 26
25/*****************************************************************************/ 27/*****************************************************************************/
26/* support stuff, copied from EV.xs mostly */
27
28static int
29extract_fd (SV *fh, int wr)
30{
31 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
32
33 if (fd < 0)
34 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
35
36 return fd;
37}
38
39static SV *
40get_cb (SV *cb_sv)
41{
42 HV *st;
43 GV *gvp;
44 CV *cv;
45
46 if (!SvOK (cb_sv))
47 return 0;
48
49 cv = sv_2cv (cb_sv, &st, &gvp, 0);
50
51 if (!cv)
52 croak ("Async::Interrupt callback must be undef or a CODE reference");
53
54 return (SV *)cv;
55}
56
57#ifndef SIG_SIZE
58/* kudos to Slaven Rezic for the idea */
59static char sig_size [] = { SIG_NUM };
60# define SIG_SIZE (sizeof (sig_size) + 1)
61#endif
62
63static int
64sv_signum (SV *sig)
65{
66 int signum;
67
68 SvGETMAGIC (sig);
69
70 for (signum = 1; signum < SIG_SIZE; ++signum)
71 if (strEQ (SvPV_nolen (sig), PL_sig_name [signum]))
72 return signum;
73
74 signum = SvIV (sig);
75
76 if (signum > 0 && signum < SIG_SIZE)
77 return signum;
78
79 return -1;
80}
81
82/*****************************************************************************/
83 28
84typedef struct { 29typedef struct {
85 SV *cb; 30 SV *cb;
86 void (*c_cb)(pTHX_ void *c_arg, int value); 31 void (*c_cb)(pTHX_ void *c_arg, int value);
87 void *c_arg; 32 void *c_arg;
88 SV *fh_r, *fh_w; 33 SV *fh_r, *fh_w;
34 SV *value;
89 int signum; 35 int signum;
90 volatile int blocked; 36 volatile int blocked;
91 37
92 int fd_r; 38 s_epipe ep;
93 volatile int fd_w;
94 int fd_wlen; 39 int fd_wlen;
95 atomic_t fd_enable; 40 atomic_t fd_enable;
96 atomic_t value;
97 atomic_t pending; 41 atomic_t pending;
42 volatile IV *valuep;
98} async_t; 43} async_t;
99 44
100static AV *asyncs; 45static AV *asyncs;
101static async_t *sig_async [SIG_SIZE]; 46static async_t *sig_async [SIG_SIZE];
102 47
110 static char pipedata [8]; 55 static char pipedata [8];
111 56
112 async_t *async = (async_t *)signal_arg; 57 async_t *async = (async_t *)signal_arg;
113 int pending = async->pending; 58 int pending = async->pending;
114 59
115 async->value = value; 60 *async->valuep = value ? value : 1;
116 async->pending = 1; 61 async->pending = 1;
117 async_pending = 1; 62 async_pending = 1;
118 psig_pend [9] = 1; 63 psig_pend [9] = 1;
119 *sig_pending = 1; 64 *sig_pending = 1;
120 65
121 { 66 if (!pending && async->fd_enable && async->ep.len)
122 int fd_w = async->fd_w; 67 s_epipe_signal (&async->ep);
123 int fd_enable = async->fd_enable;
124
125 if (!pending && fd_w >= 0 && fd_enable)
126 if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
127 /* on EINVAL we assume it's an eventfd */
128 write (fd_w, pipedata, (async->fd_wlen = 8));
129 }
130} 68}
131 69
132static void 70static void
133handle_async (async_t *async) 71handle_async (async_t *async)
134{ 72{
135 int old_errno = errno; 73 int old_errno = errno;
136 int value = async->value; 74 int value = *async->valuep;
137 75
76 *async->valuep = 0;
138 async->pending = 0; 77 async->pending = 0;
139 78
140 /* drain pipe */ 79 /* drain pipe */
141 if (async->fd_r >= 0 && async->fd_enable) 80 if (async->fd_enable && async->ep.len)
142 { 81 s_epipe_drain (&async->ep);
143 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
144
145 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
146 ;
147 }
148 82
149 if (async->c_cb) 83 if (async->c_cb)
150 { 84 {
151 dTHX; 85 dTHX;
152 async->c_cb (aTHX_ async->c_arg, value); 86 async->c_cb (aTHX_ async->c_arg, value);
263 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 197 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
264 198
265PROTOTYPES: DISABLE 199PROTOTYPES: DISABLE
266 200
267void 201void
268_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) 202_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
269 PPCODE: 203 PPCODE:
270{ 204{
271 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; 205 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
272 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
273 int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
274 async_t *async; 206 async_t *async;
275 207
276 Newz (0, async, 1, async_t); 208 Newz (0, async, 1, async_t);
277 209
278 XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 210 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
211 /* TODO: need to bless right now to ensure deallocation */
279 av_push (asyncs, TOPs); 212 av_push (asyncs, TOPs);
280 213
281 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 214 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
282 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 215 if (SvOK (fh_r) || SvOK (fh_w))
216 {
217 int fd_r = s_fileno_croak (fh_r, 0);
218 int fd_w = s_fileno_croak (fh_w, 1);
219
220 async->fh_r = newSVsv (fh_r);
221 async->fh_w = newSVsv (fh_w);
222 async->ep.fd [0] = fd_r;
223 async->ep.fd [1] = fd_w;
283 async->fd_wlen = 1; 224 async->ep.len = 1;
284 async->fd_enable = 1; 225 async->fd_enable = 1;
226 }
227
228 async->value = SvROK (pvalue)
229 ? SvREFCNT_inc_NN (SvRV (pvalue))
230 : NEWSV (0, 0);
231
232 sv_setiv (async->value, 0);
233 SvIOK_only (async->value); /* just to be sure */
234 SvREADONLY_on (async->value);
235
236 async->valuep = &(SvIVX (async->value));
237
285 async->cb = cv; 238 async->cb = cv;
286 async->c_cb = c_cb; 239 async->c_cb = c_cb;
287 async->c_arg = c_arg; 240 async->c_arg = c_arg;
288 SvGETMAGIC (signl);
289 async->signum = SvOK (signl) ? sv_signum (signl) : 0; 241 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
290 242
291 if (async->signum) 243 if (async->signum)
292 { 244 {
293 if (async->signum < 0) 245 if (async->signum < 0)
294 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl)); 246 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
312 PPCODE: 264 PPCODE:
313 EXTEND (SP, 2); 265 EXTEND (SP, 2);
314 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 266 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
315 PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 267 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
316 268
269IV
270c_var (async_t *async)
271 CODE:
272 RETVAL = PTR2IV (async->valuep);
273 OUTPUT:
274 RETVAL
275
317void 276void
318signal (async_t *async, int value = 0) 277signal (async_t *async, int value = 1)
319 CODE: 278 CODE:
320 async_signal (async, value); 279 async_signal (async, value);
321 280
322void 281void
323block (async_t *async) 282block (async_t *async)
347 ALIAS: 306 ALIAS:
348 pipe_enable = 1 307 pipe_enable = 1
349 pipe_disable = 0 308 pipe_disable = 0
350 CODE: 309 CODE:
351 async->fd_enable = ix; 310 async->fd_enable = ix;
311
312int
313pipe_fileno (async_t *async)
314 CODE:
315 if (!async->ep.len)
316 {
317 int res;
318
319 /*block (async);*//*TODO*/
320 res = s_epipe_new (&async->ep);
321 async->fd_enable = 1;
322 /*unblock (async);*//*TODO*/
323
324 if (res < 0)
325 croak ("Async::Interrupt: unable to initialize event pipe");
326 }
327
328 RETVAL = async->ep.fd [0];
329 OUTPUT:
330 RETVAL
331
332
333void
334post_fork (async_t *async)
335 CODE:
336 if (async->ep.len)
337 {
338 int res;
339
340 /*block (async);*//*TODO*/
341 res = s_epipe_renew (&async->ep);
342 /*unblock (async);*//*TODO*/
343
344 if (res < 0)
345 croak ("Async::Interrupt: unable to initialize event pipe after fork");
346 }
352 347
353void 348void
354DESTROY (SV *self) 349DESTROY (SV *self)
355 CODE: 350 CODE:
356{ 351{
384 sigaction (async->signum, &sa, 0); 379 sigaction (async->signum, &sa, 0);
385 } 380 }
386#endif 381#endif
387 } 382 }
388 383
384 if (!async->fh_r && async->ep.len)
385 s_epipe_destroy (&async->ep);
386
389 SvREFCNT_dec (async->fh_r); 387 SvREFCNT_dec (async->fh_r);
390 SvREFCNT_dec (async->fh_w); 388 SvREFCNT_dec (async->fh_w);
391 SvREFCNT_dec (async->cb); 389 SvREFCNT_dec (async->cb);
390 SvREFCNT_dec (async->value);
392 391
393 Safefree (async); 392 Safefree (async);
394} 393}
395 394

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines