ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.8 by root, Tue Jul 14 19:29:26 2009 UTC vs.
Revision 1.20 by root, Fri Apr 11 04:24:47 2014 UTC

1#include "EXTERN.h" 1#include "EXTERN.h"
2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4 4
5#define ECB_NO_LIBM 1
6#define ECB_NO_THREADS 1
7#include "ecb.h"
5#include "schmorp.h" 8#include "schmorp.h"
6 9
7typedef volatile sig_atomic_t atomic_t; 10typedef volatile sig_atomic_t atomic_t;
8 11
9static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ 12static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
29typedef struct { 32typedef struct {
30 SV *cb; 33 SV *cb;
31 void (*c_cb)(pTHX_ void *c_arg, int value); 34 void (*c_cb)(pTHX_ void *c_arg, int value);
32 void *c_arg; 35 void *c_arg;
33 SV *fh_r, *fh_w; 36 SV *fh_r, *fh_w;
37 SV *value;
34 int signum; 38 int signum;
39 int autodrain;
40 ANY *scope_savestack;
35 volatile int blocked; 41 volatile int blocked;
36 42
37 int fd_r; 43 s_epipe ep;
38 volatile int fd_w;
39 int fd_wlen; 44 int fd_wlen;
40 atomic_t fd_enable; 45 atomic_t fd_enable;
41 atomic_t value;
42 atomic_t pending; 46 atomic_t pending;
47 volatile IV *valuep;
48 atomic_t hysteresis;
43} async_t; 49} async_t;
44 50
45static AV *asyncs; 51static AV *asyncs;
46static async_t *sig_async [SIG_SIZE]; 52static async_t *sig_async [SIG_SIZE];
47 53
48#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv)) 54#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
49#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv)) 55#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
50 56
57static void async_signal (void *signal_arg, int value);
58
59static void
60setsig (int signum, void (*handler)(int))
61{
62#if _WIN32
63 signal (signum, handler);
64#else
65 struct sigaction sa;
66 sa.sa_handler = handler;
67 sigfillset (&sa.sa_mask);
68 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
69 sigaction (signum, &sa, 0);
70#endif
71}
72
73static void
74async_sigsend (int signum)
75{
76 async_signal (sig_async [signum], 0);
77}
78
51/* the main workhorse to signal */ 79/* the main workhorse to signal */
52static void 80static void
53async_signal (void *signal_arg, int value) 81async_signal (void *signal_arg, int value)
54{ 82{
55 static char pipedata [8]; 83 static char pipedata [8];
56 84
57 async_t *async = (async_t *)signal_arg; 85 async_t *async = (async_t *)signal_arg;
58 int pending = async->pending; 86 int pending = async->pending;
59 87
60 async->value = value; 88 if (async->hysteresis)
89 setsig (async->signum, SIG_IGN);
90
91 *async->valuep = value ? value : 1;
92 ECB_MEMORY_FENCE_RELEASE;
61 async->pending = 1; 93 async->pending = 1;
94 ECB_MEMORY_FENCE_RELEASE;
62 async_pending = 1; 95 async_pending = 1;
96 ECB_MEMORY_FENCE_RELEASE;
97
98 if (!async->blocked)
99 {
63 psig_pend [9] = 1; 100 psig_pend [9] = 1;
101 ECB_MEMORY_FENCE_RELEASE;
64 *sig_pending = 1; 102 *sig_pending = 1;
65 103 ECB_MEMORY_FENCE_RELEASE;
66 {
67 int fd_w = async->fd_w;
68 int fd_enable = async->fd_enable;
69
70 if (!pending && fd_w >= 0 && fd_enable)
71 if (write (fd_w, pipedata, async->fd_wlen) < 0 && errno == EINVAL)
72 /* on EINVAL we assume it's an eventfd */
73 write (fd_w, pipedata, (async->fd_wlen = 8));
74 } 104 }
105
106 if (!pending && async->fd_enable && async->ep.len)
107 s_epipe_signal (&async->ep);
75} 108}
76 109
77static void 110static void
78handle_async (async_t *async) 111handle_async (async_t *async)
79{ 112{
80 int old_errno = errno; 113 int old_errno = errno;
81 int value = async->value; 114 int value = *async->valuep;
82 115
116 *async->valuep = 0;
83 async->pending = 0; 117 async->pending = 0;
84 118
119 /* restore signal */
120 if (async->hysteresis)
121 setsig (async->signum, async_sigsend);
122
85 /* drain pipe */ 123 /* drain pipe */
86 if (async->fd_r >= 0 && async->fd_enable) 124 if (async->fd_enable && async->ep.len && async->autodrain)
87 { 125 s_epipe_drain (&async->ep);
88 char dummy [9]; /* 9 is enough for eventfd and normal pipes */
89
90 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
91 ;
92 }
93 126
94 if (async->c_cb) 127 if (async->c_cb)
95 { 128 {
96 dTHX; 129 dTHX;
97 async->c_cb (aTHX_ async->c_arg, value); 130 async->c_cb (aTHX_ async->c_arg, value);
142static void 175static void
143handle_asyncs (void) 176handle_asyncs (void)
144{ 177{
145 int i; 178 int i;
146 179
180 ECB_MEMORY_FENCE_ACQUIRE;
181
147 async_pending = 0; 182 async_pending = 0;
148 183
149 for (i = AvFILLp (asyncs); i >= 0; --i) 184 for (i = AvFILLp (asyncs); i >= 0; --i)
150 { 185 {
186 SV *async_sv = AvARRAY (asyncs)[i];
151 async_t *async = SvASYNC_nrv (AvARRAY (asyncs)[i]); 187 async_t *async = SvASYNC_nrv (async_sv);
152 188
153 if (async->pending && !async->blocked) 189 if (async->pending && !async->blocked)
190 {
191 /* temporarily keep a refcount */
192 SvREFCNT_inc (async_sv);
154 handle_async (async); 193 handle_async (async);
194 SvREFCNT_dec (async_sv);
195
196 /* the handler could have deleted any number of asyncs */
197 if (i > AvFILLp (asyncs))
198 i = AvFILLp (asyncs);
199 }
155 } 200 }
156} 201}
157 202
158#if HAS_SA_SIGINFO 203#if HAS_SA_SIGINFO
159static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg) 204static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
171 else 216 else
172 old_sighandler (signum); 217 old_sighandler (signum);
173} 218}
174#endif 219#endif
175 220
176static void
177async_sigsend (int signum)
178{
179 async_signal (sig_async [signum], 0);
180}
181
182#define block(async) ++(async)->blocked 221#define block(async) ++(async)->blocked
183 222
184static void 223static void
185unblock (async_t *async) 224unblock (async_t *async)
186{ 225{
191 230
192static void 231static void
193scope_block_cb (pTHX_ void *async_sv) 232scope_block_cb (pTHX_ void *async_sv)
194{ 233{
195 async_t *async = SvASYNC_nrv ((SV *)async_sv); 234 async_t *async = SvASYNC_nrv ((SV *)async_sv);
235
236 async->scope_savestack = 0;
196 unblock (async); 237 unblock (async);
197 SvREFCNT_dec (async_sv); 238 SvREFCNT_dec (async_sv);
239}
240
241static void
242scope_block (SV *async_sv)
243{
244 async_t *async = SvASYNC_nrv (async_sv);
245
246 /* as a heuristic, we skip the scope block if we already are blocked */
247 /* and the existing scope block used the same savestack */
248
249 if (!async->scope_savestack || async->scope_savestack != PL_savestack)
250 {
251 async->scope_savestack = PL_savestack;
252 block (async);
253
254 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
255 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
256 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
257 }
198} 258}
199 259
200MODULE = Async::Interrupt PACKAGE = Async::Interrupt 260MODULE = Async::Interrupt PACKAGE = Async::Interrupt
201 261
202BOOT: 262BOOT:
208 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 268 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
209 269
210PROTOTYPES: DISABLE 270PROTOTYPES: DISABLE
211 271
212void 272void
213_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl) 273_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
214 PPCODE: 274 PPCODE:
215{ 275{
216 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0; 276 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
217 int fd_r = SvOK (fh_r) ? s_fileno_croak (fh_r, 0) : -1;
218 int fd_w = SvOK (fh_w) ? s_fileno_croak (fh_w, 1) : -1;
219 async_t *async; 277 async_t *async;
220 278
221 Newz (0, async, 1, async_t); 279 Newz (0, async, 1, async_t);
222 280
223 XPUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 281 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
282 /* TODO: need to bless right now to ensure deallocation */
224 av_push (asyncs, TOPs); 283 av_push (asyncs, TOPs);
225 284
226 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 285 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
227 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 286 if (SvOK (fh_r) || SvOK (fh_w))
287 {
288 int fd_r = s_fileno_croak (fh_r, 0);
289 int fd_w = s_fileno_croak (fh_w, 1);
290
291 async->fh_r = newSVsv (fh_r);
292 async->fh_w = newSVsv (fh_w);
293 async->ep.fd [0] = fd_r;
294 async->ep.fd [1] = fd_w;
228 async->fd_wlen = 1; 295 async->ep.len = 1;
229 async->fd_enable = 1; 296 async->fd_enable = 1;
297 }
298
299 async->value = SvROK (pvalue)
300 ? SvREFCNT_inc_NN (SvRV (pvalue))
301 : NEWSV (0, 0);
302
303 sv_setiv (async->value, 0);
304 SvIOK_only (async->value); /* just to be sure */
305 SvREADONLY_on (async->value);
306
307 async->valuep = &(SvIVX (async->value));
308
309 async->autodrain = 1;
230 async->cb = cv; 310 async->cb = cv;
231 async->c_cb = c_cb; 311 async->c_cb = c_cb;
232 async->c_arg = c_arg; 312 async->c_arg = c_arg;
233 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0; 313 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
234 314
236 { 316 {
237 if (async->signum < 0) 317 if (async->signum < 0)
238 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl)); 318 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
239 319
240 sig_async [async->signum] = async; 320 sig_async [async->signum] = async;
241#if _WIN32
242 signal (async->signum, async_sigsend); 321 setsig (async->signum, async_sigsend);
243#else
244 {
245 struct sigaction sa = { };
246 sa.sa_handler = async_sigsend;
247 sigfillset (&sa.sa_mask);
248 sigaction (async->signum, &sa, 0);
249 }
250#endif
251 } 322 }
252} 323}
324
325void
326signal_hysteresis (async_t *async, int enable)
327 CODE:
328 async->hysteresis = enable;
253 329
254void 330void
255signal_func (async_t *async) 331signal_func (async_t *async)
256 PPCODE: 332 PPCODE:
257 EXTEND (SP, 2); 333 EXTEND (SP, 2);
258 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 334 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
259 PUSHs (sv_2mortal (newSViv (PTR2IV (async)))); 335 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
260 336
261void 337void
338scope_block_func (SV *self)
339 PPCODE:
340 EXTEND (SP, 2);
341 PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
342 PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
343
344IV
345c_var (async_t *async)
346 CODE:
347 RETVAL = PTR2IV (async->valuep);
348 OUTPUT:
349 RETVAL
350
351void
352handle (async_t *async)
353 CODE:
354 handle_async (async);
355
356void
262signal (async_t *async, int value = 0) 357signal (async_t *async, int value = 1)
263 CODE: 358 CODE:
264 async_signal (async, value); 359 async_signal (async, value);
265 360
266void 361void
267block (async_t *async) 362block (async_t *async)
274 unblock (async); 369 unblock (async);
275 370
276void 371void
277scope_block (SV *self) 372scope_block (SV *self)
278 CODE: 373 CODE:
279{ 374 scope_block (SvRV (self));
280 SV *async_sv = SvRV (self);
281 async_t *async = SvASYNC_nrv (async_sv);
282 block (async);
283
284 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
285 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
286 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
287}
288 375
289void 376void
290pipe_enable (async_t *async) 377pipe_enable (async_t *async)
291 ALIAS: 378 ALIAS:
292 pipe_enable = 1 379 pipe_enable = 1
293 pipe_disable = 0 380 pipe_disable = 0
294 CODE: 381 CODE:
295 async->fd_enable = ix; 382 async->fd_enable = ix;
296 383
384int
385pipe_fileno (async_t *async)
386 CODE:
387 if (!async->ep.len)
388 {
389 int res;
390
391 /*block (async);*//*TODO*/
392 res = s_epipe_new (&async->ep);
393 async->fd_enable = 1;
394 /*unblock (async);*//*TODO*/
395
396 if (res < 0)
397 croak ("Async::Interrupt: unable to initialize event pipe");
398 }
399
400 RETVAL = async->ep.fd [0];
401 OUTPUT:
402 RETVAL
403
404int
405pipe_autodrain (async_t *async, int enable = -1)
406 CODE:
407 RETVAL = async->autodrain;
408 if (enable >= 0)
409 async->autodrain = enable;
410 OUTPUT:
411 RETVAL
412
413void
414pipe_drain (async_t *async)
415 CODE:
416 if (async->ep.len)
417 s_epipe_drain (&async->ep);
418
419void
420post_fork (async_t *async)
421 CODE:
422 if (async->ep.len)
423 {
424 int res;
425
426 /*block (async);*//*TODO*/
427 res = s_epipe_renew (&async->ep);
428 /*unblock (async);*//*TODO*/
429
430 if (res < 0)
431 croak ("Async::Interrupt: unable to initialize event pipe after fork");
432 }
433
297void 434void
298DESTROY (SV *self) 435DESTROY (SV *self)
299 CODE: 436 CODE:
300{ 437{
301 int i; 438 int i;
302 SV *async_sv = SvRV (self); 439 SV *async_sv = SvRV (self);
303 async_t *async = SvASYNC_nrv (async_sv); 440 async_t *async = SvASYNC_nrv (async_sv);
304 441
305 for (i = AvFILLp (asyncs); i >= 0; --i) 442 for (i = AvFILLp (asyncs); i >= 0; --i)
306 if (AvARRAY (asyncs)[i] == async_sv) 443 if (AvARRAY (asyncs)[i] == async_sv)
307 { 444 {
308 if (i < AvFILLp (asyncs))
309 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; 445 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
310 446 av_pop (asyncs);
311 assert (av_pop (asyncs) == async_sv);
312 goto found; 447 goto found;
313 } 448 }
314 449
315 if (!PL_dirty) 450 if (!PL_dirty)
316 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); 451 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
317 452
318 found: 453 found:
319 454
320 if (async->signum) 455 if (async->signum)
321 {
322#if _WIN32
323 signal (async->signum, SIG_DFL); 456 setsig (async->signum, SIG_DFL);
324#else 457
325 { 458 if (!async->fh_r && async->ep.len)
326 struct sigaction sa = { }; 459 s_epipe_destroy (&async->ep);
327 sa.sa_handler = SIG_DFL;
328 sigaction (async->signum, &sa, 0);
329 }
330#endif
331 }
332 460
333 SvREFCNT_dec (async->fh_r); 461 SvREFCNT_dec (async->fh_r);
334 SvREFCNT_dec (async->fh_w); 462 SvREFCNT_dec (async->fh_w);
335 SvREFCNT_dec (async->cb); 463 SvREFCNT_dec (async->cb);
464 SvREFCNT_dec (async->value);
336 465
337 Safefree (async); 466 Safefree (async);
338} 467}
339 468
469SV *
470sig2num (SV *signame_or_number)
471 ALIAS:
472 sig2num = 0
473 sig2name = 1
474 PROTOTYPE: $
475 CODE:
476{
477 int signum = s_signum (signame_or_number);
478
479 if (signum < 0)
480 RETVAL = &PL_sv_undef;
481 else if (ix)
482 RETVAL = newSVpv (PL_sig_name [signum], 0);
483 else
484 RETVAL = newSViv (signum);
485}
486 OUTPUT:
487 RETVAL
488
489MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
490
491void
492new (const char *klass)
493 PPCODE:
494{
495 s_epipe *epp;
496
497 Newz (0, epp, 1, s_epipe);
498 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
499
500 if (s_epipe_new (epp) < 0)
501 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
502}
503
504void
505filenos (s_epipe *epp)
506 PPCODE:
507 EXTEND (SP, 2);
508 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
509 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
510
511int
512fileno (s_epipe *epp)
513 ALIAS:
514 fileno = 0
515 fileno_r = 0
516 fileno_w = 1
517 CODE:
518 RETVAL = epp->fd [ix];
519 OUTPUT:
520 RETVAL
521
522int
523type (s_epipe *epp)
524 CODE:
525 RETVAL = epp->len;
526 OUTPUT:
527 RETVAL
528
529void
530s_epipe_signal (s_epipe *epp)
531
532void
533s_epipe_drain (s_epipe *epp)
534
535void
536signal_func (s_epipe *epp)
537 ALIAS:
538 drain_func = 1
539 PPCODE:
540 EXTEND (SP, 2);
541 PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
542 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
543
544void
545s_epipe_wait (s_epipe *epp)
546
547void
548DESTROY (s_epipe *epp)
549 CODE:
550 s_epipe_destroy (epp);
551

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines