ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.2
Committed: Thu Jul 2 15:13:03 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.1: +126 -33 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     typedef volatile sig_atomic_t atomic_t;
6    
7     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8     static Sighandler_t old_sighandler;
9     static atomic_t async_pending;
10    
11     static int
12     extract_fd (SV *fh, int wr)
13     {
14     int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
15    
16     if (fd < 0)
17     croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
18    
19     return fd;
20     }
21    
22     static SV *
23     get_cb (SV *cb_sv)
24     {
25     HV *st;
26     GV *gvp;
27     CV *cv;
28    
29     if (!SvOK (cb_sv))
30     return 0;
31    
32     cv = sv_2cv (cb_sv, &st, &gvp, 0);
33    
34     if (!cv)
35     croak ("Async::Interrupt callback must be undef or a CODE reference");
36    
37     return (SV *)cv;
38     }
39    
40     static AV *asyncs;
41    
42     struct async {
43     SV *cb;
44 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
45     void *c_arg;
46     SV *fh_r, *fh_w;
47     int blocked;
48 root 1.1
49 root 1.2 int fd_r, fd_w;
50 root 1.1 atomic_t value;
51 root 1.2 atomic_t pending;
52 root 1.1 };
53    
54     /* the main workhorse to signal */
55     static void
56     async_signal (void *signal_arg, int value)
57     {
58     struct async *async = (struct async *)signal_arg;
59 root 1.2 int pending = async->pending;
60 root 1.1
61 root 1.2 async->value = value;
62     async->pending = 1;
63     async_pending = 1;
64     psig_pend [9] = 1;
65     *sig_pending = 1;
66 root 1.1
67 root 1.2 if (!pending && async->fd_w >= 0)
68     write (async->fd_w, async, 1);
69     }
70    
71     static void
72     handle_async (struct async *async)
73     {
74     int old_errno = errno;
75     int value = async->value;
76    
77     async->pending = 0;
78    
79     /* drain pipe */
80     if (async->fd_r >= 0)
81 root 1.1 {
82 root 1.2 char dummy [4];
83    
84     while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
85     ;
86     }
87    
88     if (async->c_cb)
89     {
90     dTHX;
91     async->c_cb (aTHX_ async->c_arg, value);
92     }
93    
94     if (async->cb)
95     {
96     dSP;
97    
98     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
99     SV *savedie = PL_diehook;
100    
101     PL_diehook = 0;
102    
103     PUSHSTACKi (PERLSI_SIGNAL);
104    
105     PUSHMARK (SP);
106     XPUSHs (sv_2mortal (newSViv (value)));
107     PUTBACK;
108     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
109    
110     if (SvTRUE (ERRSV))
111     {
112     SPAGAIN;
113    
114     PUSHMARK (SP);
115     PUTBACK;
116     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
117    
118     sv_setpvn (ERRSV, "", 0);
119     }
120    
121     if (saveerr)
122     sv_setsv (ERRSV, saveerr);
123 root 1.1
124 root 1.2 {
125     SV *oldhook = PL_diehook;
126     PL_diehook = savedie;
127     SvREFCNT_dec (oldhook);
128     }
129    
130     POPSTACK;
131 root 1.1 }
132    
133 root 1.2 errno = old_errno;
134 root 1.1 }
135    
136     static void
137 root 1.2 handle_asyncs (void)
138 root 1.1 {
139 root 1.2 int i;
140    
141 root 1.1 async_pending = 0;
142 root 1.2
143     for (i = AvFILLp (asyncs); i >= 0; --i)
144     {
145     struct async *async = INT2PTR (struct async *, SvIVX (AvARRAY (asyncs)[i]));
146    
147     if (async->pending && !async->blocked)
148     handle_async (async);
149     }
150 root 1.1 }
151    
152     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
153     static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
154     {
155     if (signum == 9)
156 root 1.2 handle_asyncs ();
157 root 1.1 else
158     old_sighandler (signum, si, sarg);
159     }
160     #else
161     static Signal_t async_sighandler (int)
162     {
163     if (signum == 9)
164 root 1.2 a_asyncssync_handle ();
165 root 1.1 else
166     old_sighandler (signum);
167     }
168     #endif
169    
170    
171     MODULE = Async::Interrupt PACKAGE = Async::Interrupt
172    
173     BOOT:
174     old_sighandler = PL_sighandlerp;
175     PL_sighandlerp = async_sighandler;
176     sig_pending = &PL_sig_pending;
177     psig_pend = PL_psig_pend;
178     asyncs = newAV ();
179    
180     SV *
181 root 1.2 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w)
182 root 1.1 CODE:
183     {
184 root 1.2 SV *cv = SvOK (cb) ? SvREFCNT_inc_NN (get_cb (cb)) : 0;
185     int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
186     int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
187     struct async *async;
188 root 1.1
189     Newz (0, async, 1, struct async);
190    
191 root 1.2 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r;
192     async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w;
193     async->cb = cv;
194 root 1.1 async->c_cb = c_cb;
195 root 1.2 async->c_arg = c_arg;
196    
197     printf ("r,w %d,%d\n", fd_r, fd_w);//D
198 root 1.1
199     RETVAL = newSViv (PTR2IV (async));
200     av_push (asyncs, RETVAL);
201     }
202     OUTPUT:
203     RETVAL
204    
205     void
206 root 1.2 signal_func (SV *self)
207 root 1.1 PPCODE:
208     EXTEND (SP, 2);
209     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
210 root 1.2 PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (self)))));
211 root 1.1
212     void
213     signal (SV *self, int value = 0)
214     CODE:
215 root 1.2 async_signal (INT2PTR (void *, SvIVX (SvRV (self))), value);
216    
217     void
218     block (SV *self)
219     CODE:
220     {
221     struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
222     ++async->blocked;
223     }
224    
225     void
226     unblock (SV *self)
227     CODE:
228     {
229     struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
230     --async->blocked;
231    
232     if (async->pending && !async->blocked)
233     handle_async (async);
234     }
235 root 1.1
236     void
237     DESTROY (SV *self)
238     CODE:
239     {
240     int i;
241     SV *async_sv = SvRV (self);
242 root 1.2 struct async *async = INT2PTR (struct async *, SvIVX (async_sv));
243 root 1.1
244 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
245 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
246     {
247     if (i < AvFILLp (asyncs))
248     AvARRAY (asyncs)[i] == AvARRAY (asyncs)[AvFILLp (asyncs)];
249    
250     assert (av_pop (asyncs) == async_sv);
251     goto found;
252     }
253    
254     if (!PL_dirty)
255 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
256 root 1.1
257     found:
258 root 1.2 SvREFCNT_dec (async->fh_r);
259     SvREFCNT_dec (async->fh_w);
260 root 1.1 SvREFCNT_dec (async->cb);
261    
262     Safefree (async);
263     }
264