ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/EV/EV.xs
Revision: 1.27
Committed: Wed Jul 22 03:02:07 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-5_161, rel-5_162, rel-5_17
Changes since 1.26: +6 -0 lines
Log Message:
5.161

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <stddef.h>
6 #include <assert.h>
7 #include <string.h>
8
9 #include "EVAPI.h"
10 #include "../Coro/CoroAPI.h"
11
12 static struct ev_prepare scheduler;
13 static struct ev_idle idler;
14 static int inhibit;
15
16 static void
17 idle_cb (EV_P_ ev_idle *w, int revents)
18 {
19 ev_idle_stop (EV_A, w);
20 }
21
22 static void
23 prepare_cb (EV_P_ ev_prepare *w, int revents)
24 {
25 static int incede;
26
27 if (inhibit)
28 return;
29
30 ++incede;
31
32 CORO_CEDE_NOTSELF;
33
34 while (CORO_NREADY >= incede && CORO_CEDE)
35 ;
36
37 /* if still ready, then we have lower-priority coroutines.
38 * poll anyways, but do not block.
39 */
40 if (CORO_NREADY >= incede)
41 {
42 if (!ev_is_active (&idler))
43 ev_idle_start (EV_A, &idler);
44 }
45 else
46 {
47 if (ev_is_active (&idler))
48 ev_idle_stop (EV_A, &idler);
49 }
50
51 --incede;
52 }
53
54 static void
55 readyhook (void)
56 {
57 if (!ev_is_active (&idler))
58 ev_idle_start (EV_DEFAULT_UC, &idler);
59 }
60
61 /*****************************************************************************/
62
63 static void
64 once_cb (int revents, void *arg)
65 {
66 SV *data = (SV *)arg;
67
68 CORO_READY (data);
69 sv_setiv (data, revents);
70 SvREFCNT_dec (data);
71 }
72
73 static int
74 slf_check_once (pTHX_ struct CoroSLF *frame)
75 {
76 SV *data = (SV *)frame->data;
77
78 /* return early when an exception is pending */
79 if (CORO_THROW)
80 return 0;
81
82 if (SvROK (data))
83 return 1; /* repeat until we have been signalled */
84 else
85 {
86 dSP;
87
88 XPUSHs (data);
89
90 PUTBACK;
91 return 0;
92 }
93 }
94
95 static void
96 slf_init_timed_io (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
97 {
98 SV *data;
99
100 if (items < 2 || items > 3)
101 croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items);
102
103 data = sv_2mortal (newRV_inc (CORO_CURRENT));
104 frame->data = (void *)data;
105 frame->prepare = GCoroAPI->prepare_schedule;
106 frame->check = slf_check_once;
107
108 if (items >= 3)
109 SvGETMAGIC (arg [2]);
110
111 ev_once (
112 EV_DEFAULT_UC,
113 sv_fileno (arg [0]),
114 SvIV (arg [1]),
115 items >= 3 && SvOK (arg [2]) ? SvNV (arg [2]) : -1.,
116 once_cb,
117 SvREFCNT_inc (data)
118 );
119 }
120
121 static void
122 slf_init_timer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
123 {
124 SV *data;
125 NV after;
126
127 if (items > 1)
128 croak ("Coro::EV::timer_once requires at most one parameter, not %d.\n", items);
129
130 data = sv_2mortal (newRV_inc (CORO_CURRENT));
131 frame->data = (void *)data;
132 frame->prepare = GCoroAPI->prepare_schedule;
133 frame->check = slf_check_once;
134
135 after = items ? SvNV (arg [0]) : 0;
136
137 ev_once (
138 EV_DEFAULT_UC,
139 -1,
140 0,
141 after >= 0. ? after : 0.,
142 once_cb,
143 SvREFCNT_inc (data)
144 );
145 }
146
147 /*****************************************************************************/
148
149 typedef struct
150 {
151 ev_io io;
152 ev_timer tw;
153 SV *data;
154 } coro_dir;
155
156 typedef struct
157 {
158 coro_dir r, w;
159 } coro_handle;
160
161 static int
162 handle_free (pTHX_ SV *sv, MAGIC *mg)
163 {
164 coro_handle *data = (coro_handle *)mg->mg_ptr;
165 mg->mg_ptr = 0;
166
167 ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io);
168 ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw);
169
170 return 0;
171 }
172
173 static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free };
174
175 static void
176 handle_cb (coro_dir *dir, int success)
177 {
178 ev_io_stop (EV_DEFAULT_UC, &dir->io);
179 ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
180
181 CORO_READY (dir->data);
182 sv_setiv (dir->data, success);
183 }
184
185 static void
186 handle_io_cb (EV_P_ ev_io *w, int revents)
187 {
188 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1);
189 }
190
191 static void
192 handle_timer_cb (EV_P_ ev_timer *w, int revents)
193 {
194 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0);
195 }
196
197 static int
198 slf_check_rw (pTHX_ struct CoroSLF *frame)
199 {
200 SV *data = (SV *)frame->data;
201
202 if (SvROK (data))
203 return 1;
204 else
205 {
206 dSP;
207
208 XPUSHs (data);
209
210 PUTBACK;
211 return 0;
212 }
213 }
214
215 static void
216 slf_init_rw (pTHX_ struct CoroSLF *frame, SV *arg, int wr)
217 {
218 AV *handle = (AV *)SvRV (arg);
219 SV *data_sv = AvARRAY (handle)[5];
220 coro_handle *data;
221 coro_dir *dir;
222 assert (AvFILLp (handle) >= 7);
223
224 if (!SvOK (data_sv))
225 {
226 int fno = sv_fileno (AvARRAY (handle)[0]);
227 data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle));
228 SvPOK_only (data_sv);
229 SvREADONLY_on (data_sv);
230 data = (coro_handle *)SvPVX (data_sv);
231 memset (data, 0, sizeof (coro_handle));
232
233 ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ);
234 ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE);
235 ev_init (&data->r.tw, handle_timer_cb);
236 ev_init (&data->w.tw, handle_timer_cb);
237
238 sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0);
239 }
240 else
241 data = (coro_handle *)SvPVX (data_sv);
242
243 dir = wr ? &data->w : &data->r;
244
245 if (ev_is_active (&dir->io) || ev_is_active (&dir->tw))
246 croak ("recursive invocation of readable_ev or writable_ev");
247
248 dir->data = sv_2mortal (newRV_inc (CORO_CURRENT));
249
250 {
251 SV *to = AvARRAY (handle)[2];
252
253 if (SvOK (to))
254 {
255 ev_timer_set (&dir->tw, 0., SvNV (to));
256 ev_timer_again (EV_DEFAULT_UC, &dir->tw);
257 }
258 }
259
260 ev_io_start (EV_DEFAULT_UC, &dir->io);
261
262 frame->data = (void *)dir->data;
263 frame->prepare = GCoroAPI->prepare_schedule;
264 frame->check = slf_check_rw;
265 }
266
267 static void
268 slf_init_readable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
269 {
270 slf_init_rw (aTHX_ frame, arg [0], 0);
271 }
272
273 static void
274 slf_init_writable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
275 {
276 slf_init_rw (aTHX_ frame, arg [0], 1);
277 }
278
279 /*****************************************************************************/
280
281 MODULE = Coro::EV PACKAGE = Coro::EV
282
283 PROTOTYPES: ENABLE
284
285 BOOT:
286 {
287 I_EV_API ("Coro::EV");
288 I_CORO_API ("Coro::EV");
289
290 EV_DEFAULT; /* make sure it is initialised */
291
292 ev_prepare_init (&scheduler, prepare_cb);
293 ev_set_priority (&scheduler, EV_MINPRI);
294 ev_prepare_start (EV_DEFAULT_UC, &scheduler);
295 ev_unref (EV_DEFAULT_UC);
296
297 ev_idle_init (&idler, idle_cb);
298 ev_set_priority (&idler, EV_MINPRI);
299
300 CORO_READYHOOK = readyhook;
301 CORO_READYHOOK (); /* make sure we don't miss previous ready's */
302 }
303
304 void
305 _loop_oneshot ()
306 CODE:
307 {
308 /* inhibit the prepare watcher, as we know we are the only
309 * ready coroutine and we don't want it to start an idle watcher
310 * just because of the fallback idle coro being of lower priority.
311 */
312 ++inhibit;
313
314 /* same reasoning as above, make sure it is stopped */
315 if (ev_is_active (&idler))
316 ev_idle_stop (EV_DEFAULT_UC, &idler);
317
318 ev_loop (EV_DEFAULT_UC, EVLOOP_ONESHOT);
319
320 --inhibit;
321 }
322
323 void
324 timed_io_once (...)
325 PROTOTYPE: $$;$
326 CODE:
327 CORO_EXECUTE_SLF_XS (slf_init_timed_io);
328
329 void
330 timer_once (...)
331 PROTOTYPE: $
332 CODE:
333 CORO_EXECUTE_SLF_XS (slf_init_timer);
334
335 void
336 _poll (...)
337 PROTOTYPE:
338 CODE:
339 CORO_EXECUTE_SLF_XS (slf_init_timer);
340
341 void
342 _readable_ev (...)
343 CODE:
344 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
345 CORO_EXECUTE_SLF_XS (slf_init_readable);
346
347 void
348 _writable_ev (...)
349 CODE:
350 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
351 CORO_EXECUTE_SLF_XS (slf_init_writable);
352