ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/EV/EV.xs
Revision: 1.42
Committed: Fri Oct 16 08:11:19 2015 UTC (8 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-6_5, rel-6_512, rel-6_513, rel-6_511, rel-6_514, rel-6_51, rel-6_52, rel-6_53, rel-6_54, rel-6_55, rel-6_49
Changes since 1.41: +10 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.10 #include <stddef.h>
6 root 1.1 #include <assert.h>
7     #include <string.h>
8    
9     #include "EVAPI.h"
10     #include "../Coro/CoroAPI.h"
11    
12 root 1.2 static struct ev_prepare scheduler;
13     static struct ev_idle idler;
14 root 1.9 static int inhibit;
15 root 1.2
16     static void
17 root 1.10 idle_cb (EV_P_ ev_idle *w, int revents)
18 root 1.2 {
19 root 1.12 ev_idle_stop (EV_A, w);
20 root 1.2 }
21    
22     static void
23 root 1.10 prepare_cb (EV_P_ ev_prepare *w, int revents)
24 root 1.2 {
25 root 1.5 static int incede;
26    
27 root 1.9 if (inhibit)
28     return;
29    
30 root 1.5 ++incede;
31    
32     CORO_CEDE_NOTSELF;
33    
34 root 1.6 while (CORO_NREADY >= incede && CORO_CEDE)
35 root 1.2 ;
36    
37     /* if still ready, then we have lower-priority coroutines.
38 root 1.5 * poll anyways, but do not block.
39 root 1.2 */
40 root 1.16 if (CORO_NREADY >= incede)
41     {
42     if (!ev_is_active (&idler))
43     ev_idle_start (EV_A, &idler);
44     }
45     else
46     {
47     if (ev_is_active (&idler))
48     ev_idle_stop (EV_A, &idler);
49     }
50 root 1.5
51     --incede;
52 root 1.2 }
53    
54 root 1.15 static void
55     readyhook (void)
56     {
57 root 1.17 if (!ev_is_active (&idler))
58     ev_idle_start (EV_DEFAULT_UC, &idler);
59 root 1.15 }
60    
61 root 1.10 /*****************************************************************************/
62    
63 root 1.18 static void
64     once_cb (int revents, void *arg)
65     {
66     SV *data = (SV *)arg;
67    
68     CORO_READY (data);
69     sv_setiv (data, revents);
70 root 1.22 SvREFCNT_dec (data);
71 root 1.18 }
72    
73     static int
74     slf_check_once (pTHX_ struct CoroSLF *frame)
75     {
76     SV *data = (SV *)frame->data;
77    
78 root 1.22 /* return early when an exception is pending */
79     if (CORO_THROW)
80     return 0;
81    
82 root 1.18 if (SvROK (data))
83     return 1; /* repeat until we have been signalled */
84     else
85     {
86     dSP;
87    
88     XPUSHs (data);
89    
90     PUTBACK;
91     return 0;
92     }
93     }
94    
95     static void
96 root 1.21 slf_init_timed_io (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
97 root 1.18 {
98     SV *data;
99    
100     if (items < 2 || items > 3)
101     croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items);
102    
103 root 1.36 SvGETMAGIC (arg [0]);
104     SvGETMAGIC (arg [1]);
105    
106     if (items >= 3)
107     SvGETMAGIC (arg [2]);
108    
109 root 1.18 data = sv_2mortal (newRV_inc (CORO_CURRENT));
110     frame->data = (void *)data;
111     frame->prepare = GCoroAPI->prepare_schedule;
112     frame->check = slf_check_once;
113    
114     ev_once (
115     EV_DEFAULT_UC,
116 root 1.39 sv_fileno (arg [0]),
117 root 1.18 SvIV (arg [1]),
118     items >= 3 && SvOK (arg [2]) ? SvNV (arg [2]) : -1.,
119     once_cb,
120 root 1.22 SvREFCNT_inc (data)
121 root 1.18 );
122     }
123    
124     static void
125 root 1.21 slf_init_timer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
126 root 1.18 {
127     SV *data;
128     NV after;
129    
130 root 1.24 if (items > 1)
131     croak ("Coro::EV::timer_once requires at most one parameter, not %d.\n", items);
132 root 1.18
133     data = sv_2mortal (newRV_inc (CORO_CURRENT));
134 root 1.19 frame->data = (void *)data;
135     frame->prepare = GCoroAPI->prepare_schedule;
136     frame->check = slf_check_once;
137 root 1.18
138 root 1.24 after = items ? SvNV (arg [0]) : 0;
139 root 1.18
140     ev_once (
141     EV_DEFAULT_UC,
142     -1,
143     0,
144     after >= 0. ? after : 0.,
145     once_cb,
146 root 1.22 SvREFCNT_inc (data)
147 root 1.18 );
148     }
149    
150     /*****************************************************************************/
151    
152 root 1.10 typedef struct
153     {
154     ev_io io;
155     ev_timer tw;
156 root 1.18 SV *data;
157 root 1.10 } coro_dir;
158    
159     typedef struct
160     {
161     coro_dir r, w;
162     } coro_handle;
163    
164     static int
165     handle_free (pTHX_ SV *sv, MAGIC *mg)
166     {
167     coro_handle *data = (coro_handle *)mg->mg_ptr;
168     mg->mg_ptr = 0;
169    
170 root 1.15 ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io);
171     ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw);
172 root 1.10
173     return 0;
174     }
175    
176     static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free };
177    
178     static void
179 root 1.18 handle_cb (coro_dir *dir, int success)
180 root 1.10 {
181 root 1.15 ev_io_stop (EV_DEFAULT_UC, &dir->io);
182     ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
183 root 1.10
184 root 1.18 CORO_READY (dir->data);
185     sv_setiv (dir->data, success);
186 root 1.10 }
187    
188     static void
189     handle_io_cb (EV_P_ ev_io *w, int revents)
190     {
191     handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1);
192     }
193    
194     static void
195     handle_timer_cb (EV_P_ ev_timer *w, int revents)
196     {
197     handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0);
198     }
199    
200 root 1.18 static int
201     slf_check_rw (pTHX_ struct CoroSLF *frame)
202     {
203 root 1.42 coro_dir *dir = (coro_dir *)frame->data;
204 root 1.18
205 root 1.34 /* return early when an exception is pending */
206     if (CORO_THROW)
207 root 1.42 {
208     ev_io_stop (EV_DEFAULT_UC, &dir->io);
209     ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
210    
211     return 0;
212     }
213 root 1.34
214 root 1.42 if (SvROK (dir->data))
215 root 1.18 return 1;
216     else
217     {
218     dSP;
219    
220 root 1.42 XPUSHs (dir->data);
221 root 1.18
222     PUTBACK;
223     return 0;
224     }
225     }
226    
227     static void
228 root 1.20 slf_init_rw (pTHX_ struct CoroSLF *frame, SV *arg, int wr)
229 root 1.18 {
230     AV *handle = (AV *)SvRV (arg);
231     SV *data_sv = AvARRAY (handle)[5];
232     coro_handle *data;
233     coro_dir *dir;
234     assert (AvFILLp (handle) >= 7);
235    
236     if (!SvOK (data_sv))
237     {
238     int fno = sv_fileno (AvARRAY (handle)[0]);
239 root 1.41 SvREFCNT_dec (data_sv);
240 root 1.18 data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle));
241     SvPOK_only (data_sv);
242     SvREADONLY_on (data_sv);
243     data = (coro_handle *)SvPVX (data_sv);
244     memset (data, 0, sizeof (coro_handle));
245    
246     ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ);
247     ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE);
248     ev_init (&data->r.tw, handle_timer_cb);
249     ev_init (&data->w.tw, handle_timer_cb);
250    
251     sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0);
252     }
253     else
254     data = (coro_handle *)SvPVX (data_sv);
255    
256     dir = wr ? &data->w : &data->r;
257    
258     if (ev_is_active (&dir->io) || ev_is_active (&dir->tw))
259 root 1.33 croak ("recursive invocation of readable_ev or writable_ev (concurrent Coro::Handle calls on same handle?), detected");
260 root 1.18
261     dir->data = sv_2mortal (newRV_inc (CORO_CURRENT));
262    
263     {
264     SV *to = AvARRAY (handle)[2];
265    
266     if (SvOK (to))
267     {
268     ev_timer_set (&dir->tw, 0., SvNV (to));
269     ev_timer_again (EV_DEFAULT_UC, &dir->tw);
270     }
271     }
272    
273     ev_io_start (EV_DEFAULT_UC, &dir->io);
274    
275 root 1.42 frame->data = (void *)dir;
276 root 1.18 frame->prepare = GCoroAPI->prepare_schedule;
277     frame->check = slf_check_rw;
278     }
279    
280     static void
281 root 1.21 slf_init_readable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
282 root 1.18 {
283     slf_init_rw (aTHX_ frame, arg [0], 0);
284     }
285    
286     static void
287 root 1.21 slf_init_writable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
288 root 1.18 {
289     slf_init_rw (aTHX_ frame, arg [0], 1);
290     }
291    
292 root 1.10 /*****************************************************************************/
293    
294 root 1.1 MODULE = Coro::EV PACKAGE = Coro::EV
295    
296     PROTOTYPES: ENABLE
297    
298     BOOT:
299     {
300 root 1.23 I_EV_API ("Coro::EV");
301     I_CORO_API ("Coro::EV");
302 root 1.2
303 root 1.17 EV_DEFAULT; /* make sure it is initialised */
304    
305 root 1.3 ev_prepare_init (&scheduler, prepare_cb);
306 root 1.10 ev_set_priority (&scheduler, EV_MINPRI);
307 root 1.17 ev_prepare_start (EV_DEFAULT_UC, &scheduler);
308     ev_unref (EV_DEFAULT_UC);
309 root 1.2
310 root 1.3 ev_idle_init (&idler, idle_cb);
311 root 1.10 ev_set_priority (&idler, EV_MINPRI);
312 root 1.15
313 root 1.29 if (!CORO_READYHOOK) /* do not override if Coro::AnyEvent already did */
314     {
315     CORO_READYHOOK = readyhook;
316     CORO_READYHOOK (); /* make sure we don't miss previous ready's */
317     }
318     }
319    
320     void
321     _set_readyhook ()
322     CODE:
323 root 1.15 CORO_READYHOOK = readyhook;
324 root 1.29 CORO_READYHOOK ();
325 root 1.1
326     void
327 root 1.9 _loop_oneshot ()
328     CODE:
329 root 1.17 {
330     /* inhibit the prepare watcher, as we know we are the only
331     * ready coroutine and we don't want it to start an idle watcher
332     * just because of the fallback idle coro being of lower priority.
333     */
334 root 1.9 ++inhibit;
335 root 1.17
336     /* same reasoning as above, make sure it is stopped */
337     if (ev_is_active (&idler))
338     ev_idle_stop (EV_DEFAULT_UC, &idler);
339 root 1.30 ev_run (EV_DEFAULT_UC, EVRUN_ONCE);
340 root 1.9 --inhibit;
341 root 1.17 }
342 root 1.9
343     void
344 root 1.18 timed_io_once (...)
345 root 1.24 PROTOTYPE: $$;$
346 root 1.1 CODE:
347 root 1.18 CORO_EXECUTE_SLF_XS (slf_init_timed_io);
348 root 1.1
349     void
350 root 1.18 timer_once (...)
351 root 1.24 PROTOTYPE: $
352 root 1.1 CODE:
353 root 1.18 CORO_EXECUTE_SLF_XS (slf_init_timer);
354 root 1.1
355 root 1.10 void
356 root 1.27 _poll (...)
357     PROTOTYPE:
358     CODE:
359     CORO_EXECUTE_SLF_XS (slf_init_timer);
360    
361 root 1.28 PROTOTYPES: DISABLE
362    
363 root 1.27 void
364 root 1.24 _readable_ev (...)
365 root 1.10 CODE:
366 root 1.23 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
367 root 1.18 CORO_EXECUTE_SLF_XS (slf_init_readable);
368 root 1.1
369 root 1.18 void
370 root 1.24 _writable_ev (...)
371 root 1.18 CODE:
372 root 1.23 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
373 root 1.18 CORO_EXECUTE_SLF_XS (slf_init_writable);
374 root 1.1