ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/EV/EV.xs
Revision: 1.43
Committed: Tue Feb 18 22:31:43 2020 UTC (4 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-6_56, rel-6_57, HEAD
Changes since 1.42: +96 -38 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <stddef.h>
6 #include <assert.h>
7 #include <string.h>
8
9 #include "EVAPI.h"
10 #include "../Coro/CoroAPI.h"
11
12 static struct ev_prepare scheduler;
13 static struct ev_idle idler;
14 static int inhibit;
15
16 static void
17 idle_cb (EV_P_ ev_idle *w, int revents)
18 {
19 ev_idle_stop (EV_A, w);
20 }
21
22 static void
23 prepare_cb (EV_P_ ev_prepare *w, int revents)
24 {
25 static int incede;
26
27 if (inhibit)
28 return;
29
30 ++incede;
31
32 CORO_CEDE_NOTSELF;
33
34 while (CORO_NREADY >= incede && CORO_CEDE)
35 ;
36
37 /* if still ready, then we have lower-priority coroutines.
38 * poll anyways, but do not block.
39 */
40 if (CORO_NREADY >= incede)
41 {
42 if (!ev_is_active (&idler))
43 ev_idle_start (EV_A, &idler);
44 }
45 else
46 {
47 if (ev_is_active (&idler))
48 ev_idle_stop (EV_A, &idler);
49 }
50
51 --incede;
52 }
53
54 static void
55 readyhook (void)
56 {
57 if (!ev_is_active (&idler))
58 ev_idle_start (EV_DEFAULT_UC, &idler);
59 }
60
61 /*****************************************************************************/
62
63 typedef struct
64 {
65 int revents;
66 SV *coro;
67 ev_io io;
68 ev_timer to;
69 } coro_once;
70
71 static void
72 once_stop (coro_once *o)
73 {
74 ev_io_stop (EV_DEFAULT_UC, &o->io);
75 ev_timer_stop (EV_DEFAULT_UC, &o->to);
76 }
77
78 static void
79 once_cb (coro_once *o, int revents)
80 {
81 CORO_READY (o->coro);
82 o->revents |= revents;
83
84 once_stop (o);
85 }
86
87 static void
88 once_cb_io (EV_P_ ev_io *w, int revents)
89 {
90 coro_once *o = (coro_once *)(((char *)w) - offsetof (coro_once, io));
91 once_cb (o, revents);
92 }
93
94 static void
95 once_cb_to (EV_P_ ev_timer *w, int revents)
96 {
97 coro_once *o = (coro_once *)(((char *)w) - offsetof (coro_once, to));
98 once_cb (o, revents);
99 }
100
101 static void
102 once_savedestructor (void *arg)
103 {
104 coro_once *o = (coro_once *)arg;
105
106 once_stop (o);
107
108 SvREFCNT_dec_NN (o->coro);
109 Safefree (o);
110 }
111
112 static coro_once *
113 once_new (void)
114 {
115 coro_once *o;
116
117 New (0, o, 1, coro_once);
118
119 o->revents = 0;
120 o->coro = SvREFCNT_inc_NN (CORO_CURRENT);
121
122 ev_init (&o->io, once_cb_io);
123 ev_init (&o->to, once_cb_to);
124
125 SAVEDESTRUCTOR (once_savedestructor, (void *)o);
126
127 return o;
128 }
129
130 static int
131 slf_check_once (pTHX_ struct CoroSLF *frame)
132 {
133 coro_once *o = (coro_once *)frame->data;
134
135 /* finish early when an exception is pending */
136 if (CORO_THROW)
137 once_stop (o);
138 else if (!o->revents)
139 return 1; /* repeat until we have been signalled */
140 else
141 {
142 /* already stopped */
143 dSP;
144 XPUSHs (sv_2mortal (newSViv (o->revents)));
145 PUTBACK;
146 }
147
148 return 0;
149 }
150
151 static void
152 slf_init_timed_io (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
153 {
154 coro_once *o;
155 int fd;
156
157 if (items < 2 || items > 3)
158 croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items);
159
160 SvGETMAGIC (arg [0]);
161 SvGETMAGIC (arg [1]);
162
163 if (items >= 3)
164 SvGETMAGIC (arg [2]);
165
166 fd = sv_fileno (arg [0]);
167
168 if (fd < 0)
169 croak ("Coro::EV::timed_io_once required a file handle with valid file descriptor.\n");
170
171 o = once_new ();
172
173 frame->data = (void *)o;
174 frame->prepare = GCoroAPI->prepare_schedule;
175 frame->check = slf_check_once;
176
177 ev_io_set (&o->io, fd, SvIV (arg [1]));
178 ev_io_start (EV_DEFAULT_UC, &o->io);
179
180 if (items >= 3 && SvOK (arg [2]))
181 {
182 ev_timer_set (&o->to, SvNV (arg [2]), 0.);
183 ev_timer_start (EV_DEFAULT_UC, &o->to);
184 }
185 }
186
187 static void
188 slf_init_timer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
189 {
190 coro_once *o;
191 NV after;
192
193 if (items > 1)
194 croak ("Coro::EV::timer_once requires at most one parameter, not %d.\n", items);
195
196 after = items ? SvNV (arg [0]) : 0;
197
198 o = once_new ();
199
200 frame->data = (void *)o;
201 frame->prepare = GCoroAPI->prepare_schedule;
202 frame->check = slf_check_once;
203
204 ev_timer_set (&o->to, after >= 0. ? after : 0., 0.);
205 ev_timer_start (EV_DEFAULT_UC, &o->to);
206 }
207
208 /*****************************************************************************/
209
210 typedef struct
211 {
212 ev_io io;
213 ev_timer tw;
214 SV *data;
215 } coro_dir;
216
217 typedef struct
218 {
219 coro_dir r, w;
220 } coro_handle;
221
222 static int
223 handle_free (pTHX_ SV *sv, MAGIC *mg)
224 {
225 coro_handle *data = (coro_handle *)mg->mg_ptr;
226 mg->mg_ptr = 0;
227
228 ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io);
229 ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw);
230
231 return 0;
232 }
233
234 static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free };
235
236 static void
237 handle_cb (coro_dir *dir, int success)
238 {
239 ev_io_stop (EV_DEFAULT_UC, &dir->io);
240 ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
241
242 CORO_READY (dir->data);
243 sv_setiv (dir->data, success);
244 }
245
246 static void
247 handle_io_cb (EV_P_ ev_io *w, int revents)
248 {
249 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1);
250 }
251
252 static void
253 handle_timer_cb (EV_P_ ev_timer *w, int revents)
254 {
255 handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0);
256 }
257
258 static int
259 slf_check_rw (pTHX_ struct CoroSLF *frame)
260 {
261 coro_dir *dir = (coro_dir *)frame->data;
262
263 /* return early when an exception is pending */
264 if (CORO_THROW)
265 {
266 ev_io_stop (EV_DEFAULT_UC, &dir->io);
267 ev_timer_stop (EV_DEFAULT_UC, &dir->tw);
268
269 return 0;
270 }
271
272 if (SvROK (dir->data))
273 return 1;
274 else
275 {
276 dSP;
277
278 XPUSHs (dir->data);
279
280 PUTBACK;
281 return 0;
282 }
283 }
284
285 static void
286 slf_init_rw (pTHX_ struct CoroSLF *frame, SV *arg, int wr)
287 {
288 AV *handle = (AV *)SvRV (arg);
289 SV *data_sv = AvARRAY (handle)[5];
290 coro_handle *data;
291 coro_dir *dir;
292 assert (AvFILLp (handle) >= 7);
293
294 if (!SvOK (data_sv))
295 {
296 int fno = sv_fileno (AvARRAY (handle)[0]);
297 SvREFCNT_dec (data_sv);
298 data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle));
299 SvPOK_only (data_sv);
300 SvREADONLY_on (data_sv);
301 data = (coro_handle *)SvPVX (data_sv);
302 memset (data, 0, sizeof (coro_handle));
303
304 ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ);
305 ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE);
306 ev_init (&data->r.tw, handle_timer_cb);
307 ev_init (&data->w.tw, handle_timer_cb);
308
309 sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0);
310 }
311 else
312 data = (coro_handle *)SvPVX (data_sv);
313
314 dir = wr ? &data->w : &data->r;
315
316 if (ev_is_active (&dir->io) || ev_is_active (&dir->tw))
317 croak ("recursive invocation of readable_ev or writable_ev (concurrent Coro::Handle calls on same handle?), detected");
318
319 dir->data = sv_2mortal (newRV_inc (CORO_CURRENT));
320
321 {
322 SV *to = AvARRAY (handle)[2];
323
324 if (SvOK (to))
325 {
326 ev_timer_set (&dir->tw, 0., SvNV (to));
327 ev_timer_again (EV_DEFAULT_UC, &dir->tw);
328 }
329 }
330
331 ev_io_start (EV_DEFAULT_UC, &dir->io);
332
333 frame->data = (void *)dir;
334 frame->prepare = GCoroAPI->prepare_schedule;
335 frame->check = slf_check_rw;
336 }
337
338 static void
339 slf_init_readable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
340 {
341 slf_init_rw (aTHX_ frame, arg [0], 0);
342 }
343
344 static void
345 slf_init_writable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items)
346 {
347 slf_init_rw (aTHX_ frame, arg [0], 1);
348 }
349
350 /*****************************************************************************/
351
352 MODULE = Coro::EV PACKAGE = Coro::EV
353
354 PROTOTYPES: ENABLE
355
356 BOOT:
357 {
358 I_EV_API ("Coro::EV");
359 I_CORO_API ("Coro::EV");
360
361 EV_DEFAULT; /* make sure it is initialised */
362
363 ev_prepare_init (&scheduler, prepare_cb);
364 ev_set_priority (&scheduler, EV_MINPRI);
365 ev_prepare_start (EV_DEFAULT_UC, &scheduler);
366 ev_unref (EV_DEFAULT_UC);
367
368 ev_idle_init (&idler, idle_cb);
369 ev_set_priority (&idler, EV_MINPRI);
370
371 if (!CORO_READYHOOK) /* do not override if Coro::AnyEvent already did */
372 {
373 CORO_READYHOOK = readyhook;
374 CORO_READYHOOK (); /* make sure we don't miss previous ready's */
375 }
376 }
377
378 void
379 _set_readyhook ()
380 CODE:
381 CORO_READYHOOK = readyhook;
382 CORO_READYHOOK ();
383
384 void
385 _loop_oneshot ()
386 CODE:
387 {
388 /* inhibit the prepare watcher, as we know we are the only
389 * ready coroutine and we don't want it to start an idle watcher
390 * just because of the fallback idle coro being of lower priority.
391 */
392 ++inhibit;
393
394 /* same reasoning as above, make sure it is stopped */
395 if (ev_is_active (&idler))
396 ev_idle_stop (EV_DEFAULT_UC, &idler);
397 ev_run (EV_DEFAULT_UC, EVRUN_ONCE);
398 --inhibit;
399 }
400
401 void
402 timed_io_once (...)
403 PROTOTYPE: $$;$
404 CODE:
405 CORO_EXECUTE_SLF_XS (slf_init_timed_io);
406
407 void
408 timer_once (...)
409 PROTOTYPE: $
410 CODE:
411 CORO_EXECUTE_SLF_XS (slf_init_timer);
412
413 void
414 _poll (...)
415 PROTOTYPE:
416 CODE:
417 CORO_EXECUTE_SLF_XS (slf_init_timer);
418
419 PROTOTYPES: DISABLE
420
421 void
422 _readable_ev (...)
423 CODE:
424 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
425 CORO_EXECUTE_SLF_XS (slf_init_readable);
426
427 void
428 _writable_ev (...)
429 CODE:
430 items = 1; /* ignore the remaining args for speed inside Coro::Handle */
431 CORO_EXECUTE_SLF_XS (slf_init_writable);
432