ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/EV/EV.xs
Revision: 1.39
Committed: Wed Oct 31 20:15:40 2012 UTC (11 years, 7 months ago) by root
Branch: MAIN
Changes since 1.38: +1 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <stddef.h>
6 #include <assert.h>
7 #include <string.h>
8
9 #include "EVAPI.h"
10 #include "../Coro/CoroAPI.h"
11
12 static struct ev_prepare scheduler;
13 static struct ev_idle idler;
14 static int inhibit;
15
16 static void
17 idle_cb (EV_P_ ev_idle *w, int revents)
18 {
19 ev_idle_stop (EV_A, w);
20 }
21
22 static void
23 prepare_cb (EV_P_ ev_prepare *w, int revents)
24 {
25 static int incede;
26
27 if (inhibit)
28 return;
29
30 ++incede;
31
32 CORO_CEDE_NOTSELF;
33
34 while (CORO_NREADY >= incede && CORO_CEDE)
35 ;
36
37 /* if still ready, then we have lower-priority coroutines.
38 * poll anyways, but do not block.
39 */
40 if (CORO_NREADY >= incede)
41 {
42 if (!ev_is_active (&idler))
43 ev_idle_start (EV_A, &idler);
44 }
45 else
46 {
47 if (ev_is_active (&idler))
48 ev_idle_stop (EV_A, &idler);
49 }
50
51 --incede;
52 }
53
54 static void
55 readyhook (void)
56 {
57 if (!ev_is_active (&idler))
58 ev_idle_start (EV_DEFAULT_UC, &idler);
59 }
60
61 /*****************************************************************************/
62
63 static void
64 once_cb (int revents, void *arg)
65 {
66 SV *data = (SV *)arg;
67
68 CORO_READY (data);
69 sv_setiv (data, revents);
70 SvREFCNT_dec (data);
71 }
72
73 static int
74 slf_check_once (pTHX_ struct CoroSLF *frame)
75 {
76 SV *data = (SV *)frame->data;
77
78 /* return early when an exception is pending */
79 if (CORO_THROW)
80 return 0;
81
82 if (SvROK (data))
83 return 1; /* repeat until we have been signalled */
84 else
85 {
86 dSP;
87
88 XPUSHs (data);
89
90 PUTBACK;
91 return 0;
92 }
93 }
94
95 static void
96 slf_init_timed_io (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
97 {
98 SV *data;
99
100 if (items < 2 || items > 3)
101 croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items);
102
103 SvGETMAGIC (arg [0]);
104 SvGETMAGIC (arg [1]);
105
106 if (items >= 3)
107 SvGETMAGIC (arg [2]);
108
109 data = sv_2mortal (newRV_inc (CORO_CURRENT));
110 frame->data = (void *)data;
111 frame->prepare = GCoroAPI->prepare_schedule;
112 frame->check = slf_check_once;
113
114 return;
115 ev_once (
116 EV_DEFAULT_UC,
117 sv_fileno (arg [0]),
118 SvIV (arg [1]),
119 items >= 3 && SvOK (arg [2]) ? SvNV (arg [2]) : -1.,
120 once_cb,
121 SvREFCNT_inc (data)
122 );
123 }
124
125 static void
126 slf_init_timer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
127 {
128 SV *data;
129 NV after;
130
131 if (items > 1)
132 croak ("Coro::EV::timer_once requires at most one parameter, not %d.\n", items);
133
134 data = sv_2mortal (newRV_inc (CORO_CURRENT));
135 frame->data = (void *)data;
136 frame->prepare = GCoroAPI->prepare_schedule;
137 frame->check = slf_check_once;
138
139 after = items ? SvNV (arg [0]) : 0;
140
141 ev_once (
142 EV_DEFAULT_UC,
143 -1,
144 0,
145 after >= 0. ? after : 0.,
146 once_cb,
147 SvREFCNT_inc (data)
148 );
149 }
150
151 /*****************************************************************************/
152
153 typedef struct
154 {
155 ev_io io;
156 ev_timer tw;
157 SV *data;
158 } coro_dir;
159
160 typedef struct
161 {
162 coro_dir r, w;
163 } coro_handle;
164
165 static int
166 handle_free (pTHX_ SV *sv, MAGIC *mg)
167 {
168 coro_handle *data = (coro_handle *)mg->mg_ptr;
169 mg->mg_ptr = 0;
170
171 ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io);
172 ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw);
173
174 return 0;
175 }
176
177 static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free };
178
179 static void
180 handle_cb (coro_dir *dir, int success)
181 {
182 ev_io_stop (EV_DEFAULT_UC, &dir->io);
183 ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
184
185 CORO_READY (dir->data);
186 sv_setiv (dir->data, success);
187 }
188
189 static void
190 handle_io_cb (EV_P_ ev_io *w, int revents)
191 {
192 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1);
193 }
194
195 static void
196 handle_timer_cb (EV_P_ ev_timer *w, int revents)
197 {
198 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0);
199 }
200
201 static int
202 slf_check_rw (pTHX_ struct CoroSLF *frame)
203 {
204 SV *data = (SV *)frame->data;
205
206 /* return early when an exception is pending */
207 if (CORO_THROW)
208 return 0;
209
210 if (SvROK (data))
211 return 1;
212 else
213 {
214 dSP;
215
216 XPUSHs (data);
217
218 PUTBACK;
219 return 0;
220 }
221 }
222
223 static void
224 slf_init_rw (pTHX_ struct CoroSLF *frame, SV *arg, int wr)
225 {
226 AV *handle = (AV *)SvRV (arg);
227 SV *data_sv = AvARRAY (handle)[5];
228 coro_handle *data;
229 coro_dir *dir;
230 assert (AvFILLp (handle) >= 7);
231
232 if (!SvOK (data_sv))
233 {
234 int fno = sv_fileno (AvARRAY (handle)[0]);
235 data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle));
236 SvPOK_only (data_sv);
237 SvREADONLY_on (data_sv);
238 data = (coro_handle *)SvPVX (data_sv);
239 memset (data, 0, sizeof (coro_handle));
240
241 ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ);
242 ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE);
243 ev_init (&data->r.tw, handle_timer_cb);
244 ev_init (&data->w.tw, handle_timer_cb);
245
246 sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0);
247 }
248 else
249 data = (coro_handle *)SvPVX (data_sv);
250
251 dir = wr ? &data->w : &data->r;
252
253 if (ev_is_active (&dir->io) || ev_is_active (&dir->tw))
254 croak ("recursive invocation of readable_ev or writable_ev (concurrent Coro::Handle calls on same handle?), detected");
255
256 dir->data = sv_2mortal (newRV_inc (CORO_CURRENT));
257
258 {
259 SV *to = AvARRAY (handle)[2];
260
261 if (SvOK (to))
262 {
263 ev_timer_set (&dir->tw, 0., SvNV (to));
264 ev_timer_again (EV_DEFAULT_UC, &dir->tw);
265 }
266 }
267
268 ev_io_start (EV_DEFAULT_UC, &dir->io);
269
270 frame->data = (void *)dir->data;
271 frame->prepare = GCoroAPI->prepare_schedule;
272 frame->check = slf_check_rw;
273 }
274
275 static void
276 slf_init_readable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
277 {
278 slf_init_rw (aTHX_ frame, arg [0], 0);
279 }
280
281 static void
282 slf_init_writable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
283 {
284 slf_init_rw (aTHX_ frame, arg [0], 1);
285 }
286
287 /*****************************************************************************/
288
289 MODULE = Coro::EV PACKAGE = Coro::EV
290
291 PROTOTYPES: ENABLE
292
293 BOOT:
294 {
295 I_EV_API ("Coro::EV");
296 I_CORO_API ("Coro::EV");
297
298 EV_DEFAULT; /* make sure it is initialised */
299
300 ev_prepare_init (&scheduler, prepare_cb);
301 ev_set_priority (&scheduler, EV_MINPRI);
302 ev_prepare_start (EV_DEFAULT_UC, &scheduler);
303 ev_unref (EV_DEFAULT_UC);
304
305 ev_idle_init (&idler, idle_cb);
306 ev_set_priority (&idler, EV_MINPRI);
307
308 if (!CORO_READYHOOK) /* do not override if Coro::AnyEvent already did */
309 {
310 CORO_READYHOOK = readyhook;
311 CORO_READYHOOK (); /* make sure we don't miss previous ready's */
312 }
313 }
314
315 void
316 _set_readyhook ()
317 CODE:
318 CORO_READYHOOK = readyhook;
319 CORO_READYHOOK ();
320
321 void
322 _loop_oneshot ()
323 CODE:
324 {
325 /* inhibit the prepare watcher, as we know we are the only
326 * ready coroutine and we don't want it to start an idle watcher
327 * just because of the fallback idle coro being of lower priority.
328 */
329 ++inhibit;
330
331 /* same reasoning as above, make sure it is stopped */
332 if (ev_is_active (&idler))
333 ev_idle_stop (EV_DEFAULT_UC, &idler);
334 ev_run (EV_DEFAULT_UC, EVRUN_ONCE);
335 --inhibit;
336 }
337
338 void
339 timed_io_once (...)
340 PROTOTYPE: $$;$
341 CODE:
342 CORO_EXECUTE_SLF_XS (slf_init_timed_io);
343
344 void
345 timer_once (...)
346 PROTOTYPE: $
347 CODE:
348 CORO_EXECUTE_SLF_XS (slf_init_timer);
349
350 void
351 _poll (...)
352 PROTOTYPE:
353 CODE:
354 CORO_EXECUTE_SLF_XS (slf_init_timer);
355
356 PROTOTYPES: DISABLE
357
358 void
359 _readable_ev (...)
360 CODE:
361 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
362 CORO_EXECUTE_SLF_XS (slf_init_readable);
363
364 void
365 _writable_ev (...)
366 CODE:
367 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
368 CORO_EXECUTE_SLF_XS (slf_init_writable);
369