#include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include #include #include #include "EVAPI.h" #include "../Coro/CoroAPI.h" static struct ev_prepare scheduler; static struct ev_idle idler; static int inhibit; static void idle_cb (EV_P_ ev_idle *w, int revents) { ev_idle_stop (EV_A, w); } static void prepare_cb (EV_P_ ev_prepare *w, int revents) { static int incede; if (inhibit) return; ++incede; CORO_CEDE_NOTSELF; while (CORO_NREADY >= incede && CORO_CEDE) ; /* if still ready, then we have lower-priority coroutines. * poll anyways, but do not block. */ if (CORO_NREADY >= incede) { if (!ev_is_active (&idler)) ev_idle_start (EV_A, &idler); } else { if (ev_is_active (&idler)) ev_idle_stop (EV_A, &idler); } --incede; } static void readyhook (void) { if (!ev_is_active (&idler)) ev_idle_start (EV_DEFAULT_UC, &idler); } /*****************************************************************************/ static void once_cb (int revents, void *arg) { SV *data = (SV *)arg; CORO_READY (data); sv_setiv (data, revents); SvREFCNT_dec (data); } static int slf_check_once (pTHX_ struct CoroSLF *frame) { SV *data = (SV *)frame->data; /* return early when an exception is pending */ if (CORO_THROW) return 0; if (SvROK (data)) return 1; /* repeat until we have been signalled */ else { dSP; XPUSHs (data); PUTBACK; return 0; } } static void slf_init_timed_io (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) { SV *data; if (items < 2 || items > 3) croak ("Coro::EV::timed_io_once requires exactly two or three parameters, not %d.\n", items); data = sv_2mortal (newRV_inc (CORO_CURRENT)); frame->data = (void *)data; frame->prepare = GCoroAPI->prepare_schedule; frame->check = slf_check_once; if (items >= 3) SvGETMAGIC (arg [2]); ev_once ( EV_DEFAULT_UC, sv_fileno (arg [0]), SvIV (arg [1]), items >= 3 && SvOK (arg [2]) ? SvNV (arg [2]) : -1., once_cb, SvREFCNT_inc (data) ); } static void slf_init_timer (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) { SV *data; NV after; if (items > 1) croak ("Coro::EV::timer_once requires at most one parameter, not %d.\n", items); data = sv_2mortal (newRV_inc (CORO_CURRENT)); frame->data = (void *)data; frame->prepare = GCoroAPI->prepare_schedule; frame->check = slf_check_once; after = items ? SvNV (arg [0]) : 0; ev_once ( EV_DEFAULT_UC, -1, 0, after >= 0. ? after : 0., once_cb, SvREFCNT_inc (data) ); } /*****************************************************************************/ typedef struct { ev_io io; ev_timer tw; SV *data; } coro_dir; typedef struct { coro_dir r, w; } coro_handle; static int handle_free (pTHX_ SV *sv, MAGIC *mg) { coro_handle *data = (coro_handle *)mg->mg_ptr; mg->mg_ptr = 0; ev_io_stop (EV_DEFAULT_UC, &data->r.io); ev_io_stop (EV_DEFAULT_UC, &data->w.io); ev_timer_stop (EV_DEFAULT_UC, &data->r.tw); ev_timer_stop (EV_DEFAULT_UC, &data->w.tw); return 0; } static MGVTBL handle_vtbl = { 0, 0, 0, 0, handle_free }; static void handle_cb (coro_dir *dir, int success) { ev_io_stop (EV_DEFAULT_UC, &dir->io); ev_timer_stop (EV_DEFAULT_UC, &dir->tw); CORO_READY (dir->data); sv_setiv (dir->data, success); } static void handle_io_cb (EV_P_ ev_io *w, int revents) { handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, io)), 1); } static void handle_timer_cb (EV_P_ ev_timer *w, int revents) { handle_cb ((coro_dir *)(((char *)w) - offsetof (coro_dir, tw)), 0); } static int slf_check_rw (pTHX_ struct CoroSLF *frame) { SV *data = (SV *)frame->data; if (SvROK (data)) return 1; else { dSP; XPUSHs (data); PUTBACK; return 0; } } static void slf_init_rw (pTHX_ struct CoroSLF *frame, SV *arg, int wr) { AV *handle = (AV *)SvRV (arg); SV *data_sv = AvARRAY (handle)[5]; coro_handle *data; coro_dir *dir; assert (AvFILLp (handle) >= 7); if (!SvOK (data_sv)) { int fno = sv_fileno (AvARRAY (handle)[0]); data_sv = AvARRAY (handle)[5] = NEWSV (0, sizeof (coro_handle)); SvPOK_only (data_sv); SvREADONLY_on (data_sv); data = (coro_handle *)SvPVX (data_sv); memset (data, 0, sizeof (coro_handle)); ev_io_init (&data->r.io, handle_io_cb, fno, EV_READ); ev_io_init (&data->w.io, handle_io_cb, fno, EV_WRITE); ev_init (&data->r.tw, handle_timer_cb); ev_init (&data->w.tw, handle_timer_cb); sv_magicext (data_sv, 0, PERL_MAGIC_ext, &handle_vtbl, (char *)data, 0); } else data = (coro_handle *)SvPVX (data_sv); dir = wr ? &data->w : &data->r; if (ev_is_active (&dir->io) || ev_is_active (&dir->tw)) croak ("recursive invocation of readable_ev or writable_ev (concurrent Coro::Handle calls on same handle?), detected"); dir->data = sv_2mortal (newRV_inc (CORO_CURRENT)); { SV *to = AvARRAY (handle)[2]; if (SvOK (to)) { ev_timer_set (&dir->tw, 0., SvNV (to)); ev_timer_again (EV_DEFAULT_UC, &dir->tw); } } ev_io_start (EV_DEFAULT_UC, &dir->io); frame->data = (void *)dir->data; frame->prepare = GCoroAPI->prepare_schedule; frame->check = slf_check_rw; } static void slf_init_readable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) { slf_init_rw (aTHX_ frame, arg [0], 0); } static void slf_init_writable (pTHX_ struct CoroSLF *frame, CV *cv, SV **arg, int items) { slf_init_rw (aTHX_ frame, arg [0], 1); } /*****************************************************************************/ MODULE = Coro::EV PACKAGE = Coro::EV PROTOTYPES: ENABLE BOOT: { I_EV_API ("Coro::EV"); I_CORO_API ("Coro::EV"); EV_DEFAULT; /* make sure it is initialised */ ev_prepare_init (&scheduler, prepare_cb); ev_set_priority (&scheduler, EV_MINPRI); ev_prepare_start (EV_DEFAULT_UC, &scheduler); ev_unref (EV_DEFAULT_UC); ev_idle_init (&idler, idle_cb); ev_set_priority (&idler, EV_MINPRI); if (!CORO_READYHOOK) /* do not override if Coro::AnyEvent already did */ { CORO_READYHOOK = readyhook; CORO_READYHOOK (); /* make sure we don't miss previous ready's */ } } void _set_readyhook () CODE: CORO_READYHOOK = readyhook; CORO_READYHOOK (); void _loop_oneshot () CODE: { /* inhibit the prepare watcher, as we know we are the only * ready coroutine and we don't want it to start an idle watcher * just because of the fallback idle coro being of lower priority. */ ++inhibit; /* same reasoning as above, make sure it is stopped */ if (ev_is_active (&idler)) ev_idle_stop (EV_DEFAULT_UC, &idler); #if EV_VERSION_MAJOR >= 4 ev_run (EV_DEFAULT_UC, EVRUN_ONCE); #else ev_loop (EV_DEFAULT_UC, EVLOOP_ONESHOT); #endif --inhibit; } void timed_io_once (...) PROTOTYPE: $$;$ CODE: CORO_EXECUTE_SLF_XS (slf_init_timed_io); void timer_once (...) PROTOTYPE: $ CODE: CORO_EXECUTE_SLF_XS (slf_init_timer); void _poll (...) PROTOTYPE: CODE: CORO_EXECUTE_SLF_XS (slf_init_timer); PROTOTYPES: DISABLE void _readable_ev (...) CODE: items = 1; /* ignore the remaining args for speed inside Coro::Handle */ CORO_EXECUTE_SLF_XS (slf_init_readable); void _writable_ev (...) CODE: items = 1; /* ignore the remaining args for speed inside Coro::Handle */ CORO_EXECUTE_SLF_XS (slf_init_writable);