ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/EV/EV.xs
Revision: 1.19
Committed: Sat Nov 15 06:26:52 2008 UTC (15 years, 6 months ago) by root
Branch: MAIN
Changes since 1.18: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <stddef.h>
6 #include <assert.h>
7 #include <string.h>
8
9 #include "EVAPI.h"
10 #include "../Coro/CoroAPI.h"
11
12 static struct ev_prepare scheduler;
13 static struct ev_idle idler;
14 static int inhibit;
15
16 static void
17 idle_cb (EV_P_ ev_idle *w, int revents)
18 {
19 ev_idle_stop (EV_A, w);
20 }
21
22 static void
23 prepare_cb (EV_P_ ev_prepare *w, int revents)
24 {
25 static int incede;
26
27 if (inhibit)
28 return;
29
30 ++incede;
31
32 CORO_CEDE_NOTSELF;
33
34 while (CORO_NREADY >= incede && CORO_CEDE)
35 ;
36
37 /* if still ready, then we have lower-priority coroutines.
38 * poll anyways, but do not block.
39 */
40 if (CORO_NREADY >= incede)
41 {
42 if (!ev_is_active (&idler))
43 ev_idle_start (EV_A, &idler);
44 }
45 else
46 {
47 if (ev_is_active (&idler))
48 ev_idle_stop (EV_A, &idler);
49 }
50
51 --incede;
52 }
53
54 static void
55 readyhook (void)
56 {
57 if (!ev_is_active (&idler))
58 ev_idle_start (EV_DEFAULT_UC, &idler);
59 }
60
61 /*****************************************************************************/
62
63 static void
64 once_cb (int revents, void *arg)
65 {
66 SV *data = (SV *)arg;
67
68 CORO_READY (data);
69 sv_setiv (data, revents);
70 }
71
72 static int
73 slf_check_once (pTHX_ struct CoroSLF *frame)
74 {
75 SV *data = (SV *)frame->data;
76
77 if (SvROK (data))
78 return 1; /* repeat until we have been signalled */
79 else
80 {
81 dSP;
82
83 XPUSHs (data);
84
85 PUTBACK;
86 return 0;
87 }
88 }
89
90 static void
91 slf_init_timed_io (aTHX_ struct CoroSLF *frame, SV **arg, int items)
92 {
93 SV *data;
94
95 if (items < 2 || items > 3)
96 croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items);
97
98 data = sv_2mortal (newRV_inc (CORO_CURRENT));
99 frame->data = (void *)data;
100 frame->prepare = GCoroAPI->prepare_schedule;
101 frame->check = slf_check_once;
102
103 ev_once (
104 EV_DEFAULT_UC,
105 sv_fileno (arg [0]),
106 SvIV (arg [1]),
107 items >= 3 && SvOK (arg [2]) ? SvNV (arg [2]) : -1.,
108 once_cb,
109 data
110 );
111 }
112
113 static void
114 slf_init_timer (aTHX_ struct CoroSLF *frame, SV **arg, int items)
115 {
116 SV *data;
117 NV after;
118
119 if (items != 1)
120 croak ("Coro::EV::timer_once requires exactly one parameter, not %d.\n", items);
121
122 data = sv_2mortal (newRV_inc (CORO_CURRENT));
123 frame->data = (void *)data;
124 frame->prepare = GCoroAPI->prepare_schedule;
125 frame->check = slf_check_once;
126
127 after = SvNV (arg [0]);
128
129 ev_once (
130 EV_DEFAULT_UC,
131 -1,
132 0,
133 after >= 0. ? after : 0.,
134 once_cb,
135 data
136 );
137 }
138
139 /*****************************************************************************/
140
141 typedef struct
142 {
143 ev_io io;
144 ev_timer tw;
145 SV *data;
146 } coro_dir;
147
148 typedef struct
149 {
150 coro_dir r, w;
151 } coro_handle;
152
153 static int
154 handle_free (pTHX_ SV *sv, MAGIC *mg)
155 {
156 coro_handle *data = (coro_handle *)mg->mg_ptr;
157 mg->mg_ptr = 0;
158
159 ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io);
160 ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw);
161
162 return 0;
163 }
164
165 static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free };
166
167 static void
168 handle_cb (coro_dir *dir, int success)
169 {
170 ev_io_stop (EV_DEFAULT_UC, &dir->io);
171 ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
172
173 CORO_READY (dir->data);
174 sv_setiv (dir->data, success);
175 }
176
177 static void
178 handle_io_cb (EV_P_ ev_io *w, int revents)
179 {
180 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1);
181 }
182
183 static void
184 handle_timer_cb (EV_P_ ev_timer *w, int revents)
185 {
186 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0);
187 }
188
189 static int
190 slf_check_rw (pTHX_ struct CoroSLF *frame)
191 {
192 SV *data = (SV *)frame->data;
193
194 if (SvROK (data))
195 return 1;
196 else
197 {
198 dSP;
199
200 XPUSHs (data);
201
202 PUTBACK;
203 return 0;
204 }
205 }
206
207 static void
208 slf_init_rw (aTHX_ struct CoroSLF *frame, SV *arg, int wr)
209 {
210 AV *handle = (AV *)SvRV (arg);
211 SV *data_sv = AvARRAY (handle)[5];
212 coro_handle *data;
213 coro_dir *dir;
214 assert (AvFILLp (handle) >= 7);
215
216 if (!SvOK (data_sv))
217 {
218 int fno = sv_fileno (AvARRAY (handle)[0]);
219 data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle));
220 SvPOK_only (data_sv);
221 SvREADONLY_on (data_sv);
222 data = (coro_handle *)SvPVX (data_sv);
223 memset (data, 0, sizeof (coro_handle));
224
225 ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ);
226 ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE);
227 ev_init (&data->r.tw, handle_timer_cb);
228 ev_init (&data->w.tw, handle_timer_cb);
229
230 sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0);
231 }
232 else
233 data = (coro_handle *)SvPVX (data_sv);
234
235 dir = wr ? &data->w : &data->r;
236
237 if (ev_is_active (&dir->io) || ev_is_active (&dir->tw))
238 croak ("recursive invocation of readable_ev or writable_ev");
239
240 dir->data = sv_2mortal (newRV_inc (CORO_CURRENT));
241
242 {
243 SV *to = AvARRAY (handle)[2];
244
245 if (SvOK (to))
246 {
247 ev_timer_set (&dir->tw, 0., SvNV (to));
248 ev_timer_again (EV_DEFAULT_UC, &dir->tw);
249 }
250 }
251
252 ev_io_start (EV_DEFAULT_UC, &dir->io);
253
254 frame->data = (void *)dir->data;
255 frame->prepare = GCoroAPI->prepare_schedule;
256 frame->check = slf_check_rw;
257 }
258
259 static void
260 slf_init_readable (aTHX_ struct CoroSLF *frame, SV **arg, int items)
261 {
262 slf_init_rw (aTHX_ frame, arg [0], 0);
263 }
264
265 static void
266 slf_init_writable (aTHX_ struct CoroSLF *frame, SV **arg, int items)
267 {
268 slf_init_rw (aTHX_ frame, arg [0], 1);
269 }
270
271 /*****************************************************************************/
272
273 MODULE = Coro::EV PACKAGE = Coro::EV
274
275 PROTOTYPES: ENABLE
276
277 BOOT:
278 {
279 I_EV_API ("Coro::EV");
280 I_CORO_API ("Coro::Event");
281
282 EV_DEFAULT; /* make sure it is initialised */
283
284 ev_prepare_init (&scheduler, prepare_cb);
285 ev_set_priority (&scheduler, EV_MINPRI);
286 ev_prepare_start (EV_DEFAULT_UC, &scheduler);
287 ev_unref (EV_DEFAULT_UC);
288
289 ev_idle_init (&idler, idle_cb);
290 ev_set_priority (&idler, EV_MINPRI);
291
292 CORO_READYHOOK = readyhook;
293 }
294
295 void
296 _loop_oneshot ()
297 CODE:
298 {
299 /* inhibit the prepare watcher, as we know we are the only
300 * ready coroutine and we don't want it to start an idle watcher
301 * just because of the fallback idle coro being of lower priority.
302 */
303 ++inhibit;
304
305 /* same reasoning as above, make sure it is stopped */
306 if (ev_is_active (&idler))
307 ev_idle_stop (EV_DEFAULT_UC, &idler);
308
309 ev_loop (EV_DEFAULT_UC, EVLOOP_ONESHOT);
310
311 --inhibit;
312 }
313
314 void
315 timed_io_once (...)
316 CODE:
317 CORO_EXECUTE_SLF_XS (slf_init_timed_io);
318
319 void
320 timer_once (...)
321 CODE:
322 CORO_EXECUTE_SLF_XS (slf_init_timer);
323
324 void
325 readable_ev (...)
326 CODE:
327 items = 1; /* ignore the remainign args for speed inside Coro::Handle */
328 CORO_EXECUTE_SLF_XS (slf_init_readable);
329
330 void
331 writable_ev (...)
332 CODE:
333 items = 1; /* ignore the remainign args for speed inside Coro::Handle */
334 CORO_EXECUTE_SLF_XS (slf_init_writable);
335