ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.5 by root, Fri Jul 3 21:11:22 2009 UTC vs.
Revision 1.19 by root, Tue Apr 24 22:01:59 2012 UTC

1#include "EXTERN.h" 1#include "EXTERN.h"
2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4
5#include "ecb.h"
6#include "schmorp.h"
4 7
5typedef volatile sig_atomic_t atomic_t; 8typedef volatile sig_atomic_t atomic_t;
6 9
7static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ 10static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8static Sighandler_t old_sighandler; 11static Sighandler_t old_sighandler;
20 23
21#if !PERL_VERSION_ATLEAST(5,10,0) 24#if !PERL_VERSION_ATLEAST(5,10,0)
22# undef HAS_SA_SIGINFO 25# undef HAS_SA_SIGINFO
23#endif 26#endif
24 27
25static int 28/*****************************************************************************/
26extract_fd (SV *fh, int wr)
27{
28 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
29 29
30 if (fd < 0) 30typedef struct {
31 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
32
33 return fd;
34}
35
36static SV *
37get_cb (SV *cb_sv)
38{
39 HV *st;
40 GV *gvp;
41 CV *cv;
42
43 if (!SvOK (cb_sv))
44 return 0;
45
46 cv = sv_2cv (cb_sv, &st, &gvp, 0);
47
48 if (!cv)
49 croak ("Async::Interrupt callback must be undef or a CODE reference");
50
51 return (SV *)cv;
52}
53
54static AV *asyncs;
55
56struct async {
57 SV *cb; 31 SV *cb;
58 void (*c_cb)(pTHX_ void *c_arg, int value); 32 void (*c_cb)(pTHX_ void *c_arg, int value);
59 void *c_arg; 33 void *c_arg;
60 SV *fh_r, *fh_w; 34 SV *fh_r, *fh_w;
35 SV *value;
36 int signum;
37 int autodrain;
38 ANY *scope_savestack;
61 int blocked; 39 volatile int blocked;
62 40
63 int fd_r, fd_w; 41 s_epipe ep;
42 int fd_wlen;
64 atomic_t value; 43 atomic_t fd_enable;
65 atomic_t pending; 44 atomic_t pending;
66}; 45 volatile IV *valuep;
46 atomic_t hysteresis;
47} async_t;
48
49static AV *asyncs;
50static async_t *sig_async [SIG_SIZE];
51
52#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
53#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
54
55static void async_signal (void *signal_arg, int value);
56
57static void
58setsig (int signum, void (*handler)(int))
59{
60#if _WIN32
61 signal (signum, handler);
62#else
63 struct sigaction sa;
64 sa.sa_handler = handler;
65 sigfillset (&sa.sa_mask);
66 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
67 sigaction (signum, &sa, 0);
68#endif
69}
70
71static void
72async_sigsend (int signum)
73{
74 async_signal (sig_async [signum], 0);
75}
67 76
68/* the main workhorse to signal */ 77/* the main workhorse to signal */
69static void 78static void
70async_signal (void *signal_arg, int value) 79async_signal (void *signal_arg, int value)
71{ 80{
81 static char pipedata [8];
82
72 struct async *async = (struct async *)signal_arg; 83 async_t *async = (async_t *)signal_arg;
73 int pending = async->pending; 84 int pending = async->pending;
74 85
75 async->value = value; 86 if (async->hysteresis)
87 setsig (async->signum, SIG_IGN);
88
89 *async->valuep = value ? value : 1;
90 ECB_MEMORY_FENCE_RELEASE;
76 async->pending = 1; 91 async->pending = 1;
92 ECB_MEMORY_FENCE_RELEASE;
77 async_pending = 1; 93 async_pending = 1;
94 ECB_MEMORY_FENCE_RELEASE;
95
96 if (!async->blocked)
97 {
78 psig_pend [9] = 1; 98 psig_pend [9] = 1;
99 ECB_MEMORY_FENCE_RELEASE;
79 *sig_pending = 1; 100 *sig_pending = 1;
101 ECB_MEMORY_FENCE_RELEASE;
102 }
80 103
81 if (!pending && async->fd_w >= 0) 104 if (!pending && async->fd_enable && async->ep.len)
82 write (async->fd_w, async, 1); 105 s_epipe_signal (&async->ep);
83} 106}
84 107
85static void 108static void
86handle_async (struct async *async) 109handle_async (async_t *async)
87{ 110{
88 int old_errno = errno; 111 int old_errno = errno;
89 int value = async->value; 112 int value = *async->valuep;
90 113
114 *async->valuep = 0;
91 async->pending = 0; 115 async->pending = 0;
92 116
117 /* restore signal */
118 if (async->hysteresis)
119 setsig (async->signum, async_sigsend);
120
93 /* drain pipe */ 121 /* drain pipe */
94 if (async->fd_r >= 0) 122 if (async->fd_enable && async->ep.len && async->autodrain)
95 { 123 s_epipe_drain (&async->ep);
96 char dummy [4];
97
98 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
99 ;
100 }
101 124
102 if (async->c_cb) 125 if (async->c_cb)
103 { 126 {
104 dTHX; 127 dTHX;
105 async->c_cb (aTHX_ async->c_arg, value); 128 async->c_cb (aTHX_ async->c_arg, value);
150static void 173static void
151handle_asyncs (void) 174handle_asyncs (void)
152{ 175{
153 int i; 176 int i;
154 177
178 ECB_MEMORY_FENCE_ACQUIRE;
179
155 async_pending = 0; 180 async_pending = 0;
156 181
157 for (i = AvFILLp (asyncs); i >= 0; --i) 182 for (i = AvFILLp (asyncs); i >= 0; --i)
158 { 183 {
159 struct async *async = INT2PTR (struct async *, SvIVX (AvARRAY (asyncs)[i])); 184 SV *async_sv = AvARRAY (asyncs)[i];
185 async_t *async = SvASYNC_nrv (async_sv);
160 186
161 if (async->pending && !async->blocked) 187 if (async->pending && !async->blocked)
188 {
189 /* temporarily keep a refcount */
190 SvREFCNT_inc (async_sv);
162 handle_async (async); 191 handle_async (async);
192 SvREFCNT_dec (async_sv);
193
194 /* the handler could have deleted any number of asyncs */
195 if (i > AvFILLp (asyncs))
196 i = AvFILLp (asyncs);
197 }
163 } 198 }
164} 199}
165 200
166#if HAS_SA_SIGINFO 201#if HAS_SA_SIGINFO
167static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg) 202static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
179 else 214 else
180 old_sighandler (signum); 215 old_sighandler (signum);
181} 216}
182#endif 217#endif
183 218
184static void 219#define block(async) ++(async)->blocked
185scope_block_cb (pTHX_ void *async_sv)
186{
187 struct async *async = INT2PTR (struct async *, SvIVX ((SV *)async_sv));
188 220
221static void
222unblock (async_t *async)
223{
189 --async->blocked; 224 --async->blocked;
190 if (async->pending && !async->blocked) 225 if (async->pending && !async->blocked)
191 handle_async (async); 226 handle_async (async);
227}
192 228
229static void
230scope_block_cb (pTHX_ void *async_sv)
231{
232 async_t *async = SvASYNC_nrv ((SV *)async_sv);
233
234 async->scope_savestack = 0;
235 unblock (async);
193 SvREFCNT_dec (async_sv); 236 SvREFCNT_dec (async_sv);
237}
238
239static void
240scope_block (SV *async_sv)
241{
242 async_t *async = SvASYNC_nrv (async_sv);
243
244 /* as a heuristic, we skip the scope block if we already are blocked */
245 /* and the existing scope block used the same savestack */
246
247 if (!async->scope_savestack || async->scope_savestack != PL_savestack)
248 {
249 async->scope_savestack = PL_savestack;
250 block (async);
251
252 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
253 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
254 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
255 }
194} 256}
195 257
196MODULE = Async::Interrupt PACKAGE = Async::Interrupt 258MODULE = Async::Interrupt PACKAGE = Async::Interrupt
197 259
198BOOT: 260BOOT:
203 asyncs = newAV (); 265 asyncs = newAV ();
204 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 266 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
205 267
206PROTOTYPES: DISABLE 268PROTOTYPES: DISABLE
207 269
208SV * 270void
209_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w) 271_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
210 CODE: 272 PPCODE:
211{ 273{
212 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; 274 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
213 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1; 275 async_t *async;
214 int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
215 struct async *async;
216 276
217 Newz (0, async, 1, struct async); 277 Newz (0, async, 1, async_t);
218 278
219 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 279 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
220 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 280 /* TODO: need to bless right now to ensure deallocation */
281 av_push (asyncs, TOPs);
282
283 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
284 if (SvOK (fh_r) || SvOK (fh_w))
285 {
286 int fd_r = s_fileno_croak (fh_r, 0);
287 int fd_w = s_fileno_croak (fh_w, 1);
288
289 async->fh_r = newSVsv (fh_r);
290 async->fh_w = newSVsv (fh_w);
291 async->ep.fd [0] = fd_r;
292 async->ep.fd [1] = fd_w;
293 async->ep.len = 1;
294 async->fd_enable = 1;
295 }
296
297 async->value = SvROK (pvalue)
298 ? SvREFCNT_inc_NN (SvRV (pvalue))
299 : NEWSV (0, 0);
300
301 sv_setiv (async->value, 0);
302 SvIOK_only (async->value); /* just to be sure */
303 SvREADONLY_on (async->value);
304
305 async->valuep = &(SvIVX (async->value));
306
307 async->autodrain = 1;
221 async->cb = cv; 308 async->cb = cv;
222 async->c_cb = c_cb; 309 async->c_cb = c_cb;
223 async->c_arg = c_arg; 310 async->c_arg = c_arg;
311 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
224 312
225 printf ("r,w %d,%d\n", fd_r, fd_w);//D 313 if (async->signum)
314 {
315 if (async->signum < 0)
316 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
226 317
227 RETVAL = newSViv (PTR2IV (async)); 318 sig_async [async->signum] = async;
228 av_push (asyncs, RETVAL); 319 setsig (async->signum, async_sigsend);
320 }
229} 321}
230 OUTPUT:
231 RETVAL
232 322
233void 323void
234signal_func (SV *self) 324signal_hysteresis (async_t *async, int enable)
325 CODE:
326 async->hysteresis = enable;
327
328void
329signal_func (async_t *async)
235 PPCODE: 330 PPCODE:
236 EXTEND (SP, 2); 331 EXTEND (SP, 2);
237 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 332 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
333 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
334
335void
336scope_block_func (SV *self)
337 PPCODE:
338 EXTEND (SP, 2);
339 PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
238 PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (self))))); 340 PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
239 341
240void 342IV
241signal (SV *self, int value = 0) 343c_var (async_t *async)
242 CODE: 344 CODE:
243 async_signal (INT2PTR (void *, SvIVX (SvRV (self))), value); 345 RETVAL = PTR2IV (async->valuep);
346 OUTPUT:
347 RETVAL
244 348
245void 349void
246block (SV *self) 350handle (async_t *async)
247 CODE: 351 CODE:
248{
249 struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
250 ++async->blocked;
251}
252
253void
254unblock (SV *self)
255 CODE:
256{
257 struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
258 --async->blocked;
259 if (async->pending && !async->blocked)
260 handle_async (async); 352 handle_async (async);
261} 353
354void
355signal (async_t *async, int value = 1)
356 CODE:
357 async_signal (async, value);
358
359void
360block (async_t *async)
361 CODE:
362 block (async);
363
364void
365unblock (async_t *async)
366 CODE:
367 unblock (async);
262 368
263void 369void
264scope_block (SV *self) 370scope_block (SV *self)
265 CODE: 371 CODE:
372 scope_block (SvRV (self));
373
374void
375pipe_enable (async_t *async)
376 ALIAS:
377 pipe_enable = 1
378 pipe_disable = 0
379 CODE:
380 async->fd_enable = ix;
381
382int
383pipe_fileno (async_t *async)
384 CODE:
385 if (!async->ep.len)
386 {
387 int res;
388
389 /*block (async);*//*TODO*/
390 res = s_epipe_new (&async->ep);
391 async->fd_enable = 1;
392 /*unblock (async);*//*TODO*/
393
394 if (res < 0)
395 croak ("Async::Interrupt: unable to initialize event pipe");
396 }
397
398 RETVAL = async->ep.fd [0];
399 OUTPUT:
400 RETVAL
401
402int
403pipe_autodrain (async_t *async, int enable = -1)
404 CODE:
405 RETVAL = async->autodrain;
406 if (enable >= 0)
407 async->autodrain = enable;
408 OUTPUT:
409 RETVAL
410
411void
412pipe_drain (async_t *async)
413 CODE:
414 if (async->ep.len)
415 s_epipe_drain (&async->ep);
416
417void
418post_fork (async_t *async)
419 CODE:
420 if (async->ep.len)
421 {
422 int res;
423
424 /*block (async);*//*TODO*/
425 res = s_epipe_renew (&async->ep);
426 /*unblock (async);*//*TODO*/
427
428 if (res < 0)
429 croak ("Async::Interrupt: unable to initialize event pipe after fork");
430 }
431
432void
433DESTROY (SV *self)
434 CODE:
266{ 435{
436 int i;
267 SV *async_sv = SvRV (self); 437 SV *async_sv = SvRV (self);
268 struct async *async = INT2PTR (struct async *, SvIVX (async_sv)); 438 async_t *async = SvASYNC_nrv (async_sv);
269 ++async->blocked;
270
271 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
272 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
273 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
274}
275
276void
277DESTROY (SV *self)
278 CODE:
279{
280 int i;
281 SV *async_sv = SvRV (self);
282 struct async *async = INT2PTR (struct async *, SvIVX (async_sv));
283 439
284 for (i = AvFILLp (asyncs); i >= 0; --i) 440 for (i = AvFILLp (asyncs); i >= 0; --i)
285 if (AvARRAY (asyncs)[i] == async_sv) 441 if (AvARRAY (asyncs)[i] == async_sv)
286 { 442 {
287 if (i < AvFILLp (asyncs))
288 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; 443 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
289 444 av_pop (asyncs);
290 assert (av_pop (asyncs) == async_sv);
291 goto found; 445 goto found;
292 } 446 }
293 447
294 if (!PL_dirty) 448 if (!PL_dirty)
295 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); 449 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
296 450
297 found: 451 found:
452
453 if (async->signum)
454 setsig (async->signum, SIG_DFL);
455
456 if (!async->fh_r && async->ep.len)
457 s_epipe_destroy (&async->ep);
458
298 SvREFCNT_dec (async->fh_r); 459 SvREFCNT_dec (async->fh_r);
299 SvREFCNT_dec (async->fh_w); 460 SvREFCNT_dec (async->fh_w);
300 SvREFCNT_dec (async->cb); 461 SvREFCNT_dec (async->cb);
462 SvREFCNT_dec (async->value);
301 463
302 Safefree (async); 464 Safefree (async);
303} 465}
304 466
467SV *
468sig2num (SV *signame_or_number)
469 ALIAS:
470 sig2num = 0
471 sig2name = 1
472 PROTOTYPE: $
473 CODE:
474{
475 int signum = s_signum (signame_or_number);
476
477 if (signum < 0)
478 RETVAL = &PL_sv_undef;
479 else if (ix)
480 RETVAL = newSVpv (PL_sig_name [signum], 0);
481 else
482 RETVAL = newSViv (signum);
483}
484 OUTPUT:
485 RETVAL
486
487MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
488
489void
490new (const char *klass)
491 PPCODE:
492{
493 s_epipe *epp;
494
495 Newz (0, epp, 1, s_epipe);
496 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
497
498 if (s_epipe_new (epp) < 0)
499 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
500}
501
502void
503filenos (s_epipe *epp)
504 PPCODE:
505 EXTEND (SP, 2);
506 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
507 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
508
509int
510fileno (s_epipe *epp)
511 ALIAS:
512 fileno = 0
513 fileno_r = 0
514 fileno_w = 1
515 CODE:
516 RETVAL = epp->fd [ix];
517 OUTPUT:
518 RETVAL
519
520int
521type (s_epipe *epp)
522 CODE:
523 RETVAL = epp->len;
524 OUTPUT:
525 RETVAL
526
527void
528s_epipe_signal (s_epipe *epp)
529
530void
531s_epipe_drain (s_epipe *epp)
532
533void
534signal_func (s_epipe *epp)
535 ALIAS:
536 drain_func = 1
537 PPCODE:
538 EXTEND (SP, 2);
539 PUSHs (sv_2mortal (newSViv (PTR2IV (ix ? s_epipe_drain : s_epipe_signal))));
540 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
541
542void
543s_epipe_wait (s_epipe *epp)
544
545void
546DESTROY (s_epipe *epp)
547 CODE:
548 s_epipe_destroy (epp);
549

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines