ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Async-Interrupt/Interrupt.xs
Revision: 1.3
Committed: Thu Jul 2 15:17:57 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.2: +5 -3 lines
Log Message:
0.02

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     typedef volatile sig_atomic_t atomic_t;
6    
7     static int *sig_pending, *psig_pend; /* make local copies because of missing THX */
8     static Sighandler_t old_sighandler;
9     static atomic_t async_pending;
10    
11     static int
12     extract_fd (SV *fh, int wr)
13     {
14     int fd = PerlIO_fileno (wr ? IoOFP (sv_2io (fh)) : IoIFP (sv_2io (fh)));
15    
16     if (fd < 0)
17     croak ("illegal fh argument, either not an OS file or read/write mode mismatch");
18    
19     return fd;
20     }
21    
22     static SV *
23     get_cb (SV *cb_sv)
24     {
25     HV *st;
26     GV *gvp;
27     CV *cv;
28    
29     if (!SvOK (cb_sv))
30     return 0;
31    
32     cv = sv_2cv (cb_sv, &st, &gvp, 0);
33    
34     if (!cv)
35     croak ("Async::Interrupt callback must be undef or a CODE reference");
36    
37     return (SV *)cv;
38     }
39    
40     static AV *asyncs;
41    
42     struct async {
43     SV *cb;
44 root 1.2 void (*c_cb)(pTHX_ void *c_arg, int value);
45     void *c_arg;
46     SV *fh_r, *fh_w;
47     int blocked;
48 root 1.1
49 root 1.2 int fd_r, fd_w;
50 root 1.1 atomic_t value;
51 root 1.2 atomic_t pending;
52 root 1.1 };
53    
54     /* the main workhorse to signal */
55     static void
56     async_signal (void *signal_arg, int value)
57     {
58     struct async *async = (struct async *)signal_arg;
59 root 1.2 int pending = async->pending;
60 root 1.1
61 root 1.2 async->value = value;
62     async->pending = 1;
63     async_pending = 1;
64     psig_pend [9] = 1;
65     *sig_pending = 1;
66 root 1.1
67 root 1.2 if (!pending && async->fd_w >= 0)
68     write (async->fd_w, async, 1);
69     }
70    
71     static void
72     handle_async (struct async *async)
73     {
74     int old_errno = errno;
75     int value = async->value;
76    
77     async->pending = 0;
78    
79     /* drain pipe */
80     if (async->fd_r >= 0)
81 root 1.1 {
82 root 1.2 char dummy [4];
83    
84     while (read (async->fd_r, dummy, sizeof (dummy)) == sizeof (dummy))
85     ;
86     }
87    
88     if (async->c_cb)
89     {
90     dTHX;
91     async->c_cb (aTHX_ async->c_arg, value);
92     }
93    
94     if (async->cb)
95     {
96     dSP;
97    
98     SV *saveerr = SvOK (ERRSV) ? sv_mortalcopy (ERRSV) : 0;
99     SV *savedie = PL_diehook;
100    
101     PL_diehook = 0;
102    
103     PUSHSTACKi (PERLSI_SIGNAL);
104    
105     PUSHMARK (SP);
106     XPUSHs (sv_2mortal (newSViv (value)));
107     PUTBACK;
108     call_sv (async->cb, G_VOID | G_DISCARD | G_EVAL);
109    
110     if (SvTRUE (ERRSV))
111     {
112     SPAGAIN;
113    
114     PUSHMARK (SP);
115     PUTBACK;
116     call_sv (get_sv ("Async::Interrupt::DIED", 1), G_VOID | G_DISCARD | G_EVAL | G_KEEPERR);
117    
118     sv_setpvn (ERRSV, "", 0);
119     }
120    
121     if (saveerr)
122     sv_setsv (ERRSV, saveerr);
123 root 1.1
124 root 1.2 {
125     SV *oldhook = PL_diehook;
126     PL_diehook = savedie;
127     SvREFCNT_dec (oldhook);
128     }
129    
130     POPSTACK;
131 root 1.1 }
132    
133 root 1.2 errno = old_errno;
134 root 1.1 }
135    
136     static void
137 root 1.2 handle_asyncs (void)
138 root 1.1 {
139 root 1.2 int i;
140    
141 root 1.1 async_pending = 0;
142 root 1.2
143     for (i = AvFILLp (asyncs); i >= 0; --i)
144     {
145     struct async *async = INT2PTR (struct async *, SvIVX (AvARRAY (asyncs)[i]));
146    
147     if (async->pending && !async->blocked)
148     handle_async (async);
149     }
150 root 1.1 }
151    
152     #if defined(HAS_SIGACTION) && defined(SA_SIGINFO)
153     static Signal_t async_sighandler (int signum, siginfo_t *si, void *sarg)
154     {
155     if (signum == 9)
156 root 1.2 handle_asyncs ();
157 root 1.1 else
158     old_sighandler (signum, si, sarg);
159     }
160     #else
161 root 1.3 static Signal_t async_sighandler (int signum)
162 root 1.1 {
163     if (signum == 9)
164 root 1.3 handle_asyncs ();
165 root 1.1 else
166     old_sighandler (signum);
167     }
168     #endif
169    
170    
171     MODULE = Async::Interrupt PACKAGE = Async::Interrupt
172    
173     BOOT:
174     old_sighandler = PL_sighandlerp;
175     PL_sighandlerp = async_sighandler;
176     sig_pending = &PL_sig_pending;
177     psig_pend = PL_psig_pend;
178     asyncs = newAV ();
179    
180 root 1.3 PROTOTYPES: DISABLE
181    
182 root 1.1 SV *
183 root 1.2 _alloc (SV *cb, void *c_cb, void *c_arg, SV *fh_r, SV *fh_w)
184 root 1.1 CODE:
185     {
186 root 1.2 SV *cv = SvOK (cb) ? SvREFCNT_inc_NN (get_cb (cb)) : 0;
187     int fd_r = SvOK (fh_r) ? extract_fd (fh_r, 0) : -1;
188     int fd_w = SvOK (fh_w) ? extract_fd (fh_w, 1) : -1;
189     struct async *async;
190 root 1.1
191     Newz (0, async, 1, struct async);
192    
193 root 1.2 async->fh_r = fd_r >= 0 ? newSVsv (fh_r) : 0; async->fd_r = fd_r;
194     async->fh_w = fd_w >= 0 ? newSVsv (fh_w) : 0; async->fd_w = fd_w;
195     async->cb = cv;
196 root 1.1 async->c_cb = c_cb;
197 root 1.2 async->c_arg = c_arg;
198    
199     printf ("r,w %d,%d\n", fd_r, fd_w);//D
200 root 1.1
201     RETVAL = newSViv (PTR2IV (async));
202     av_push (asyncs, RETVAL);
203     }
204     OUTPUT:
205     RETVAL
206    
207     void
208 root 1.2 signal_func (SV *self)
209 root 1.1 PPCODE:
210     EXTEND (SP, 2);
211     PUSHs (sv_2mortal (newSViv (PTR2IV (async_signal))));
212 root 1.2 PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (self)))));
213 root 1.1
214     void
215     signal (SV *self, int value = 0)
216     CODE:
217 root 1.2 async_signal (INT2PTR (void *, SvIVX (SvRV (self))), value);
218    
219     void
220     block (SV *self)
221     CODE:
222     {
223     struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
224     ++async->blocked;
225     }
226    
227     void
228     unblock (SV *self)
229     CODE:
230     {
231     struct async *async = INT2PTR (struct async *, SvIVX (SvRV (self)));
232     --async->blocked;
233    
234     if (async->pending && !async->blocked)
235     handle_async (async);
236     }
237 root 1.1
238     void
239     DESTROY (SV *self)
240     CODE:
241     {
242     int i;
243     SV *async_sv = SvRV (self);
244 root 1.2 struct async *async = INT2PTR (struct async *, SvIVX (async_sv));
245 root 1.1
246 root 1.2 for (i = AvFILLp (asyncs); i >= 0; --i)
247 root 1.1 if (AvARRAY (asyncs)[i] == async_sv)
248     {
249     if (i < AvFILLp (asyncs))
250 root 1.3 AvARRAY (asyncs)[i] = AvARRAY (asyncs)[AvFILLp (asyncs)];
251 root 1.1
252     assert (av_pop (asyncs) == async_sv);
253     goto found;
254     }
255    
256     if (!PL_dirty)
257 root 1.2 warn ("Async::Interrupt::DESTROY could not find async object in list of asyncs, please report");
258 root 1.1
259     found:
260 root 1.2 SvREFCNT_dec (async->fh_r);
261     SvREFCNT_dec (async->fh_w);
262 root 1.1 SvREFCNT_dec (async->cb);
263    
264     Safefree (async);
265     }
266