ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/EV/EV.xs
Revision: 1.23
Committed: Sat Jun 20 08:58:00 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.22: +4 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <stddef.h>
6 #include <assert.h>
7 #include <string.h>
8
9 #include "EVAPI.h"
10 #include "../Coro/CoroAPI.h"
11
12 static struct ev_prepare scheduler;
13 static struct ev_idle idler;
14 static int inhibit;
15
16 static void
17 idle_cb (EV_P_ ev_idle *w, int revents)
18 {
19 ev_idle_stop (EV_A, w);
20 }
21
22 static void
23 prepare_cb (EV_P_ ev_prepare *w, int revents)
24 {
25 static int incede;
26
27 if (inhibit)
28 return;
29
30 ++incede;
31
32 CORO_CEDE_NOTSELF;
33
34 while (CORO_NREADY >= incede && CORO_CEDE)
35 ;
36
37 /* if still ready, then we have lower-priority coroutines.
38 * poll anyways, but do not block.
39 */
40 if (CORO_NREADY >= incede)
41 {
42 if (!ev_is_active (&idler))
43 ev_idle_start (EV_A, &idler);
44 }
45 else
46 {
47 if (ev_is_active (&idler))
48 ev_idle_stop (EV_A, &idler);
49 }
50
51 --incede;
52 }
53
54 static void
55 readyhook (void)
56 {
57 if (!ev_is_active (&idler))
58 ev_idle_start (EV_DEFAULT_UC, &idler);
59 }
60
61 /*****************************************************************************/
62
63 static void
64 once_cb (int revents, void *arg)
65 {
66 SV *data = (SV *)arg;
67
68 CORO_READY (data);
69 sv_setiv (data, revents);
70 SvREFCNT_dec (data);
71 }
72
73 static int
74 slf_check_once (pTHX_ struct CoroSLF *frame)
75 {
76 SV *data = (SV *)frame->data;
77
78 /* return early when an exception is pending */
79 if (CORO_THROW)
80 return 0;
81
82 if (SvROK (data))
83 return 1; /* repeat until we have been signalled */
84 else
85 {
86 dSP;
87
88 XPUSHs (data);
89
90 PUTBACK;
91 return 0;
92 }
93 }
94
95 static void
96 slf_init_timed_io (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
97 {
98 SV *data;
99
100 if (items < 2 || items > 3)
101 croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items);
102
103 data = sv_2mortal (newRV_inc (CORO_CURRENT));
104 frame->data = (void *)data;
105 frame->prepare = GCoroAPI->prepare_schedule;
106 frame->check = slf_check_once;
107
108 ev_once (
109 EV_DEFAULT_UC,
110 sv_fileno (arg [0]),
111 SvIV (arg [1]),
112 items >= 3 && SvOK (arg [2]) ? SvNV (arg [2]) : -1.,
113 once_cb,
114 SvREFCNT_inc (data)
115 );
116 }
117
118 static void
119 slf_init_timer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
120 {
121 SV *data;
122 NV after;
123
124 if (items != 1)
125 croak ("Coro::EV::timer_once requires exactly one parameter, not %d.\n", items);
126
127 data = sv_2mortal (newRV_inc (CORO_CURRENT));
128 frame->data = (void *)data;
129 frame->prepare = GCoroAPI->prepare_schedule;
130 frame->check = slf_check_once;
131
132 after = SvNV (arg [0]);
133
134 ev_once (
135 EV_DEFAULT_UC,
136 -1,
137 0,
138 after >= 0. ? after : 0.,
139 once_cb,
140 SvREFCNT_inc (data)
141 );
142 }
143
144 /*****************************************************************************/
145
146 typedef struct
147 {
148 ev_io io;
149 ev_timer tw;
150 SV *data;
151 } coro_dir;
152
153 typedef struct
154 {
155 coro_dir r, w;
156 } coro_handle;
157
158 static int
159 handle_free (pTHX_ SV *sv, MAGIC *mg)
160 {
161 coro_handle *data = (coro_handle *)mg->mg_ptr;
162 mg->mg_ptr = 0;
163
164 ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io);
165 ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw);
166
167 return 0;
168 }
169
170 static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free };
171
172 static void
173 handle_cb (coro_dir *dir, int success)
174 {
175 ev_io_stop (EV_DEFAULT_UC, &dir->io);
176 ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
177
178 CORO_READY (dir->data);
179 sv_setiv (dir->data, success);
180 }
181
182 static void
183 handle_io_cb (EV_P_ ev_io *w, int revents)
184 {
185 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1);
186 }
187
188 static void
189 handle_timer_cb (EV_P_ ev_timer *w, int revents)
190 {
191 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0);
192 }
193
194 static int
195 slf_check_rw (pTHX_ struct CoroSLF *frame)
196 {
197 SV *data = (SV *)frame->data;
198
199 if (SvROK (data))
200 return 1;
201 else
202 {
203 dSP;
204
205 XPUSHs (data);
206
207 PUTBACK;
208 return 0;
209 }
210 }
211
212 static void
213 slf_init_rw (pTHX_ struct CoroSLF *frame, SV *arg, int wr)
214 {
215 AV *handle = (AV *)SvRV (arg);
216 SV *data_sv = AvARRAY (handle)[5];
217 coro_handle *data;
218 coro_dir *dir;
219 assert (AvFILLp (handle) >= 7);
220
221 if (!SvOK (data_sv))
222 {
223 int fno = sv_fileno (AvARRAY (handle)[0]);
224 data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle));
225 SvPOK_only (data_sv);
226 SvREADONLY_on (data_sv);
227 data = (coro_handle *)SvPVX (data_sv);
228 memset (data, 0, sizeof (coro_handle));
229
230 ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ);
231 ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE);
232 ev_init (&data->r.tw, handle_timer_cb);
233 ev_init (&data->w.tw, handle_timer_cb);
234
235 sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0);
236 }
237 else
238 data = (coro_handle *)SvPVX (data_sv);
239
240 dir = wr ? &data->w : &data->r;
241
242 if (ev_is_active (&dir->io) || ev_is_active (&dir->tw))
243 croak ("recursive invocation of readable_ev or writable_ev");
244
245 dir->data = sv_2mortal (newRV_inc (CORO_CURRENT));
246
247 {
248 SV *to = AvARRAY (handle)[2];
249
250 if (SvOK (to))
251 {
252 ev_timer_set (&dir->tw, 0., SvNV (to));
253 ev_timer_again (EV_DEFAULT_UC, &dir->tw);
254 }
255 }
256
257 ev_io_start (EV_DEFAULT_UC, &dir->io);
258
259 frame->data = (void *)dir->data;
260 frame->prepare = GCoroAPI->prepare_schedule;
261 frame->check = slf_check_rw;
262 }
263
264 static void
265 slf_init_readable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
266 {
267 slf_init_rw (aTHX_ frame, arg [0], 0);
268 }
269
270 static void
271 slf_init_writable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
272 {
273 slf_init_rw (aTHX_ frame, arg [0], 1);
274 }
275
276 /*****************************************************************************/
277
278 MODULE = Coro::EV PACKAGE = Coro::EV
279
280 PROTOTYPES: ENABLE
281
282 BOOT:
283 {
284 I_EV_API ("Coro::EV");
285 I_CORO_API ("Coro::EV");
286
287 EV_DEFAULT; /* make sure it is initialised */
288
289 ev_prepare_init (&scheduler, prepare_cb);
290 ev_set_priority (&scheduler, EV_MINPRI);
291 ev_prepare_start (EV_DEFAULT_UC, &scheduler);
292 ev_unref (EV_DEFAULT_UC);
293
294 ev_idle_init (&idler, idle_cb);
295 ev_set_priority (&idler, EV_MINPRI);
296
297 CORO_READYHOOK = readyhook;
298 }
299
300 void
301 _loop_oneshot ()
302 CODE:
303 {
304 /* inhibit the prepare watcher, as we know we are the only
305 * ready coroutine and we don't want it to start an idle watcher
306 * just because of the fallback idle coro being of lower priority.
307 */
308 ++inhibit;
309
310 /* same reasoning as above, make sure it is stopped */
311 if (ev_is_active (&idler))
312 ev_idle_stop (EV_DEFAULT_UC, &idler);
313
314 ev_loop (EV_DEFAULT_UC, EVLOOP_ONESHOT);
315
316 --inhibit;
317 }
318
319 void
320 timed_io_once (...)
321 CODE:
322 CORO_EXECUTE_SLF_XS (slf_init_timed_io);
323
324 void
325 timer_once (...)
326 CODE:
327 CORO_EXECUTE_SLF_XS (slf_init_timer);
328
329 void
330 readable_ev (...)
331 CODE:
332 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
333 CORO_EXECUTE_SLF_XS (slf_init_readable);
334
335 void
336 writable_ev (...)
337 CODE:
338 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
339 CORO_EXECUTE_SLF_XS (slf_init_writable);
340