ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
(Generate patch)

Comparing Async-Interrupt/Interrupt.xs (file contents):
Revision 1.5 by root, Fri Jul 3 21:11:22 2009 UTC vs.
Revision 1.17 by root, Tue Nov 24 15:51:35 2009 UTC

1#include "EXTERN.h" 1#include "EXTERN.h"
2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4
5#include "schmorp.h"
4 6
5typedef volatile sig_atomic_t atomic_t; 7typedef volatile sig_atomic_t atomic_t;
6 8
7static int *sig_pending, *psig_pend; /* make local copies because of missing THX */ 9static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8static Sighandler_t old_sighandler; 10static Sighandler_t old_sighandler;
20 22
21#if !PERL_VERSION_ATLEAST(5,10,0) 23#if !PERL_VERSION_ATLEAST(5,10,0)
22# undef HAS_SA_SIGINFO 24# undef HAS_SA_SIGINFO
23#endif 25#endif
24 26
25static int 27/*****************************************************************************/
26extract_fd (SV *fh, int wr)
27{
28 int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
29 28
30 if (fd < 0) 29typedef struct {
31 croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
32
33 return fd;
34}
35
36static SV *
37get_cb (SV *cb_sv)
38{
39 HV *st;
40 GV *gvp;
41 CV *cv;
42
43 if (!SvOK (cb_sv))
44 return 0;
45
46 cv = sv_2cv (cb_sv, &st, &gvp, 0);
47
48 if (!cv)
49 croak ("Async::Interrupt callback must be undef or a CODE reference");
50
51 return (SV *)cv;
52}
53
54static AV *asyncs;
55
56struct async {
57 SV *cb; 30 SV *cb;
58 void (*c_cb)(pTHX_ void *c_arg, int value); 31 void (*c_cb)(pTHX_ void *c_arg, int value);
59 void *c_arg; 32 void *c_arg;
60 SV *fh_r, *fh_w; 33 SV *fh_r, *fh_w;
34 SV *value;
35 int signum;
36 int autodrain;
37 ANY *scope_savestack;
61 int blocked; 38 volatile int blocked;
62 39
63 int fd_r, fd_w; 40 s_epipe ep;
41 int fd_wlen;
64 atomic_t value; 42 atomic_t fd_enable;
65 atomic_t pending; 43 atomic_t pending;
66}; 44 volatile IV *valuep;
45 atomic_t hysteresis;
46} async_t;
47
48static AV *asyncs;
49static async_t *sig_async [SIG_SIZE];
50
51#define SvASYNC_nrv(sv) INT2PTR (async_t *, SvIVX (sv))
52#define SvASYNC(rv) SvASYNC_nrv (SvRV (rv))
53
54static void async_signal (void *signal_arg, int value);
55
56static void
57setsig (int signum, void (*handler)(int))
58{
59#if _WIN32
60 signal (signum, handler);
61#else
62 struct sigaction sa;
63 sa.sa_handler = handler;
64 sigfillset (&sa.sa_mask);
65 sa.sa_flags = 0; /* if we interrupt a syscall, we might drain the pipe before it became ready */
66 sigaction (signum, &sa, 0);
67#endif
68}
69
70static void
71async_sigsend (int signum)
72{
73 async_signal (sig_async [signum], 0);
74}
67 75
68/* the main workhorse to signal */ 76/* the main workhorse to signal */
69static void 77static void
70async_signal (void *signal_arg, int value) 78async_signal (void *signal_arg, int value)
71{ 79{
80 static char pipedata [8];
81
72 struct async *async = (struct async *)signal_arg; 82 async_t *async = (async_t *)signal_arg;
73 int pending = async->pending; 83 int pending = async->pending;
74 84
75 async->value = value; 85 if (async->hysteresis)
86 setsig (async->signum, SIG_IGN);
87
88 *async->valuep = value ? value : 1;
76 async->pending = 1; 89 async->pending = 1;
77 async_pending = 1; 90 async_pending = 1;
78 psig_pend [9] = 1; 91 psig_pend [9] = 1;
79 *sig_pending = 1; 92 *sig_pending = 1;
80 93
81 if (!pending && async->fd_w >= 0) 94 if (!pending && async->fd_enable && async->ep.len)
82 write (async->fd_w, async, 1); 95 s_epipe_signal (&async->ep);
83} 96}
84 97
85static void 98static void
86handle_async (struct async *async) 99handle_async (async_t *async)
87{ 100{
88 int old_errno = errno; 101 int old_errno = errno;
89 int value = async->value; 102 int value = *async->valuep;
90 103
104 *async->valuep = 0;
91 async->pending = 0; 105 async->pending = 0;
92 106
107 /* restore signal */
108 if (async->hysteresis)
109 setsig (async->signum, async_sigsend);
110
93 /* drain pipe */ 111 /* drain pipe */
94 if (async->fd_r >= 0) 112 if (async->fd_enable && async->ep.len && async->autodrain)
95 { 113 s_epipe_drain (&async->ep);
96 char dummy [4];
97
98 while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
99 ;
100 }
101 114
102 if (async->c_cb) 115 if (async->c_cb)
103 { 116 {
104 dTHX; 117 dTHX;
105 async->c_cb (aTHX_ async->c_arg, value); 118 async->c_cb (aTHX_ async->c_arg, value);
154 167
155 async_pending = 0; 168 async_pending = 0;
156 169
157 for (i = AvFILLp (asyncs); i >= 0; --i) 170 for (i = AvFILLp (asyncs); i >= 0; --i)
158 { 171 {
159 struct async *async = INT2PTR (struct async *, SvIVX (AvARRAY (asyncs)[i])); 172 SV *async_sv = AvARRAY (asyncs)[i];
173 async_t *async = SvASYNC_nrv (async_sv);
160 174
161 if (async->pending && !async->blocked) 175 if (async->pending && !async->blocked)
176 {
177 /* temporarily keep a refcount */
178 SvREFCNT_inc (async_sv);
162 handle_async (async); 179 handle_async (async);
180 SvREFCNT_dec (async_sv);
181
182 /* the handler could have deleted any number of asyncs */
183 if (i > AvFILLp (asyncs))
184 i = AvFILLp (asyncs);
185 }
163 } 186 }
164} 187}
165 188
166#if HAS_SA_SIGINFO 189#if HAS_SA_SIGINFO
167static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg) 190static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
179 else 202 else
180 old_sighandler (signum); 203 old_sighandler (signum);
181} 204}
182#endif 205#endif
183 206
184static void 207#define block(async) ++(async)->blocked
185scope_block_cb (pTHX_ void *async_sv)
186{
187 struct async *async = INT2PTR (struct async *, SvIVX ((SV *)async_sv));
188 208
209static void
210unblock (async_t *async)
211{
189 --async->blocked; 212 --async->blocked;
190 if (async->pending && !async->blocked) 213 if (async->pending && !async->blocked)
191 handle_async (async); 214 handle_async (async);
215}
192 216
217static void
218scope_block_cb (pTHX_ void *async_sv)
219{
220 async_t *async = SvASYNC_nrv ((SV *)async_sv);
221
222 async->scope_savestack = 0;
223 unblock (async);
193 SvREFCNT_dec (async_sv); 224 SvREFCNT_dec (async_sv);
225}
226
227static void
228scope_block (SV *async_sv)
229{
230 async_t *async = SvASYNC_nrv (async_sv);
231
232 /* as a heuristic, we skip the scope block if we already are blocked */
233 /* and the existing scope block used the same savestack */
234
235 if (!async->scope_savestack || async->scope_savestack != PL_savestack)
236 {
237 async->scope_savestack = PL_savestack;
238 block (async);
239
240 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
241 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv));
242 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */
243 }
194} 244}
195 245
196MODULE = Async::Interrupt PACKAGE = Async::Interrupt 246MODULE = Async::Interrupt PACKAGE = Async::Interrupt
197 247
198BOOT: 248BOOT:
203 asyncs = newAV (); 253 asyncs = newAV ();
204 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */ 254 CvNODEBUG_on (get_cv ("Async::Interrupt::scope_block", 0)); /* otherwise calling scope can be the debugger */
205 255
206PROTOTYPES: DISABLE 256PROTOTYPES: DISABLE
207 257
208SV * 258void
209_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w) 259_alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w, SV *signl, SV *pvalue)
210 CODE: 260 PPCODE:
211{ 261{
212 SV *cv = SvOK (cb) ? SvREFCNT_inc (get_cb (cb)) : 0; 262 SV *cv = SvOK (cb) ? SvREFCNT_inc (s_get_cv_croak (cb)) : 0;
213 int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
214 int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
215 struct async *async; 263 async_t *async;
216 264
217 Newz (0, async, 1, struct async); 265 Newz (0, async, 1, async_t);
218 266
219 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r; 267 XPUSHs (sv_2mortal (newSViv (PTR2IV (async))));
220 async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w; 268 /* TODO: need to bless right now to ensure deallocation */
269 av_push (asyncs, TOPs);
270
271 SvGETMAGIC (fh_r); SvGETMAGIC (fh_w);
272 if (SvOK (fh_r) || SvOK (fh_w))
273 {
274 int fd_r = s_fileno_croak (fh_r, 0);
275 int fd_w = s_fileno_croak (fh_w, 1);
276
277 async->fh_r = newSVsv (fh_r);
278 async->fh_w = newSVsv (fh_w);
279 async->ep.fd [0] = fd_r;
280 async->ep.fd [1] = fd_w;
281 async->ep.len = 1;
282 async->fd_enable = 1;
283 }
284
285 async->value = SvROK (pvalue)
286 ? SvREFCNT_inc_NN (SvRV (pvalue))
287 : NEWSV (0, 0);
288
289 sv_setiv (async->value, 0);
290 SvIOK_only (async->value); /* just to be sure */
291 SvREADONLY_on (async->value);
292
293 async->valuep = &(SvIVX (async->value));
294
295 async->autodrain = 1;
221 async->cb = cv; 296 async->cb = cv;
222 async->c_cb = c_cb; 297 async->c_cb = c_cb;
223 async->c_arg = c_arg; 298 async->c_arg = c_arg;
299 async->signum = SvOK (signl) ? s_signum_croak (signl) : 0;
224 300
225 printf ("r,w %d,%d\n", fd_r, fd_w);//D 301 if (async->signum)
302 {
303 if (async->signum < 0)
304 croak ("Async::Interrupt::new got passed illegal signal name or number: %s", SvPV_nolen (signl));
226 305
227 RETVAL = newSViv (PTR2IV (async)); 306 sig_async [async->signum] = async;
228 av_push (asyncs, RETVAL); 307 setsig (async->signum, async_sigsend);
308 }
229} 309}
230 OUTPUT:
231 RETVAL
232 310
233void 311void
234signal_func (SV *self) 312signal_hysteresis (async_t *async, int enable)
313 CODE:
314 async->hysteresis = enable;
315
316void
317signal_func (async_t *async)
235 PPCODE: 318 PPCODE:
236 EXTEND (SP, 2); 319 EXTEND (SP, 2);
237 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal)))); 320 PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
321 PUSHs (sv_2mortal (newSViv (PTR2IV (async))));
322
323void
324scope_block_func (SV *self)
325 PPCODE:
326 EXTEND (SP, 2);
327 PUSHs (sv_2mortal (newSViv (PTR2IV (scope_block))));
238 PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (self))))); 328 PUSHs (sv_2mortal (newSViv (PTR2IV (SvRV (self)))));
239 329
240void 330IV
241signal (SV *self, int value = 0) 331c_var (async_t *async)
242 CODE: 332 CODE:
243 async_signal (INT2PTR (void *, SvIVX (SvRV (self))), value); 333 RETVAL = PTR2IV (async->valuep);
334 OUTPUT:
335 RETVAL
244 336
245void 337void
246block (SV *self) 338signal (async_t *async, int value = 1)
247 CODE: 339 CODE:
248{ 340 async_signal (async, value);
249 struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
250 ++async->blocked;
251}
252 341
253void 342void
254unblock (SV *self) 343block (async_t *async)
255 CODE: 344 CODE:
256{ 345 block (async);
257 struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self))); 346
258 --async->blocked; 347void
259 if (async->pending && !async->blocked) 348unblock (async_t *async)
260 handle_async (async); 349 CODE:
261} 350 unblock (async);
262 351
263void 352void
264scope_block (SV *self) 353scope_block (SV *self)
265 CODE: 354 CODE:
266{ 355 scope_block (SvRV (self));
267 SV *async_sv = SvRV (self);
268 struct async *async = INT2PTR (struct async *, SvIVX (async_sv));
269 ++async->blocked;
270 356
271 LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ 357void
272 SAVEDESTRUCTOR_X (scope_block_cb, (void *)SvREFCNT_inc (async_sv)); 358pipe_enable (async_t *async)
273 ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ 359 ALIAS:
274} 360 pipe_enable = 1
361 pipe_disable = 0
362 CODE:
363 async->fd_enable = ix;
364
365int
366pipe_fileno (async_t *async)
367 CODE:
368 if (!async->ep.len)
369 {
370 int res;
371
372 /*block (async);*//*TODO*/
373 res = s_epipe_new (&async->ep);
374 async->fd_enable = 1;
375 /*unblock (async);*//*TODO*/
376
377 if (res < 0)
378 croak ("Async::Interrupt: unable to initialize event pipe");
379 }
380
381 RETVAL = async->ep.fd [0];
382 OUTPUT:
383 RETVAL
384
385int
386pipe_autodrain (async_t *async, int enable = -1)
387 CODE:
388 RETVAL = async->autodrain;
389 if (enable >= 0)
390 async->autodrain = enable;
391 OUTPUT:
392 RETVAL
393
394void
395post_fork (async_t *async)
396 CODE:
397 if (async->ep.len)
398 {
399 int res;
400
401 /*block (async);*//*TODO*/
402 res = s_epipe_renew (&async->ep);
403 /*unblock (async);*//*TODO*/
404
405 if (res < 0)
406 croak ("Async::Interrupt: unable to initialize event pipe after fork");
407 }
275 408
276void 409void
277DESTROY (SV *self) 410DESTROY (SV *self)
278 CODE: 411 CODE:
279{ 412{
280 int i; 413 int i;
281 SV *async_sv = SvRV (self); 414 SV *async_sv = SvRV (self);
282 struct async *async = INT2PTR (struct async *, SvIVX (async_sv)); 415 async_t *async = SvASYNC_nrv (async_sv);
283 416
284 for (i = AvFILLp (asyncs); i >= 0; --i) 417 for (i = AvFILLp (asyncs); i >= 0; --i)
285 if (AvARRAY (asyncs)[i] == async_sv) 418 if (AvARRAY (asyncs)[i] == async_sv)
286 { 419 {
287 if (i < AvFILLp (asyncs))
288 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)]; 420 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
289 421 av_pop (asyncs);
290 assert (av_pop (asyncs) == async_sv);
291 goto found; 422 goto found;
292 } 423 }
293 424
294 if (!PL_dirty) 425 if (!PL_dirty)
295 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report"); 426 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
296 427
297 found: 428 found:
429
430 if (async->signum)
431 setsig (async->signum, SIG_DFL);
432
433 if (!async->fh_r && async->ep.len)
434 s_epipe_destroy (&async->ep);
435
298 SvREFCNT_dec (async->fh_r); 436 SvREFCNT_dec (async->fh_r);
299 SvREFCNT_dec (async->fh_w); 437 SvREFCNT_dec (async->fh_w);
300 SvREFCNT_dec (async->cb); 438 SvREFCNT_dec (async->cb);
439 SvREFCNT_dec (async->value);
301 440
302 Safefree (async); 441 Safefree (async);
303} 442}
304 443
444SV *
445sig2num (SV *signame_or_number)
446 ALIAS:
447 sig2num = 0
448 sig2name = 1
449 PROTOTYPE: $
450 CODE:
451{
452 int signum = s_signum (signame_or_number);
453
454 if (signum < 0)
455 RETVAL = &PL_sv_undef;
456 else if (ix)
457 RETVAL = newSVpv (PL_sig_name [signum], 0);
458 else
459 RETVAL = newSViv (signum);
460}
461 OUTPUT:
462 RETVAL
463
464MODULE = Async::Interrupt PACKAGE = Async::Interrupt::EventPipe PREFIX = s_epipe_
465
466void
467new (const char *klass)
468 PPCODE:
469{
470 s_epipe *epp;
471
472 Newz (0, epp, 1, s_epipe);
473 XPUSHs (sv_setref_iv (sv_newmortal (), klass, PTR2IV (epp)));
474
475 if (s_epipe_new (epp) < 0)
476 croak ("Async::Interrupt::EventPipe: unable to create new event pipe");
477}
478
479void
480filenos (s_epipe *epp)
481 PPCODE:
482 EXTEND (SP, 2);
483 PUSHs (sv_2mortal (newSViv (epp->fd [0])));
484 PUSHs (sv_2mortal (newSViv (epp->fd [1])));
485
486int
487fileno (s_epipe *epp)
488 ALIAS:
489 fileno = 0
490 fileno_r = 0
491 fileno_w = 1
492 CODE:
493 RETVAL = epp->fd [ix];
494 OUTPUT:
495 RETVAL
496
497int
498type (s_epipe *epp)
499 CODE:
500 RETVAL = epp->len;
501 OUTPUT:
502 RETVAL
503
504void
505s_epipe_signal (s_epipe *epp)
506
507void
508s_epipe_drain (s_epipe *epp)
509
510void
511drain_func (s_epipe *epp)
512 PPCODE:
513 EXTEND (SP, 2);
514 PUSHs (sv_2mortal (newSViv (PTR2IV (s_epipe_drain))));
515 PUSHs (sv_2mortal (newSViv (PTR2IV (epp))));
516
517void
518s_epipe_wait (s_epipe *epp)
519
520void
521DESTROY (s_epipe *epp)
522 CODE:
523 s_epipe_destroy (epp);
524

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines