#include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include #include "EVAPI.h" #include "xthread.h" /* our userdata */ typedef struct { mutex_t lock; /* global loop lock */ void (*signal_func) (void *signal_arg, int value); void *signal_arg; ev_async async_w; thread_t tid; cond_t invoke_cv; mutex_t invoke_mutex; SV *interrupt; } udat; static void c_func (pTHX_ void *loop_, int value) { struct ev_loop *loop = (struct ev_loop *)loop_; udat *u = ev_userdata (EV_A); X_LOCK (u->lock); ev_invoke_pending (loop); X_UNLOCK (u->lock); X_COND_SIGNAL (u->invoke_cv); } static void async_cb (EV_P_ ev_async *w, int revents) { /* just used for the side effects */ } static void l_release (EV_P) { udat *u = ev_userdata (EV_A); X_UNLOCK (u->lock); } static void l_acquire (EV_P) { udat *u = ev_userdata (EV_A); X_LOCK (u->lock); } static void l_invoke (EV_P) { udat *u = ev_userdata (EV_A); X_UNLOCK (u->lock); u->signal_func (u->signal_arg, 1); X_COND_WAIT (u->invoke_cv, u->invoke_mutex); X_LOCK (u->lock); } X_THREAD_PROC(l_run) { struct ev_loop *loop = (struct ev_loop *)thr_arg; udat *u = ev_userdata (loop); X_LOCK (u->invoke_mutex); X_LOCK (u->lock); ev_ref (loop); /* really? */ for (;;) /* really? */ ev_loop (loop, 0); X_UNLOCK (u->lock); } static void scope_lock_cb (pTHX_ void *loop_) { struct ev_loop *loop = (struct ev_loop *)SvIVX ((SV *)loop_); udat *u = ev_userdata (loop); X_UNLOCK (u->lock); SvREFCNT_dec ((SV *)loop_); } MODULE = EV::Loop::Async PACKAGE = EV::Loop::Async PROTOTYPES: ENABLE BOOT: { I_EV_API ("EV::Loop::Async"); CvNODEBUG_on (get_cv ("EV::Loop::Async::scope_lock", 0)); /* otherwise calling scope can be the debugger */ } void _c_func (SV *loop) PPCODE: EXTEND (SP, 2); PUSHs (sv_2mortal (newSViv (PTR2IV (c_func)))); PUSHs (sv_2mortal (newSViv (SvIVX (SvRV (loop))))); void _attach (SV *loop_, SV *interrupt, IV sig_func, void *sig_arg) CODE: { pthread_mutexattr_t ma; struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_)); udat *u; Newz (0, u, 1, udat); u->interrupt = newSVsv (interrupt); u->signal_func = (void (*)(void *, int))sig_func; u->signal_arg = sig_arg; ev_async_init (&u->async_w, async_cb); ev_async_start (loop, &u->async_w); pthread_mutexattr_init (&ma); pthread_mutexattr_settype (&ma, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init (&u->lock, &ma); pthread_mutex_init (&u->invoke_mutex, &ma); pthread_mutexattr_destroy (&ma); pthread_cond_init (&u->invoke_cv, 0); ev_set_userdata (loop, u); ev_set_invoke_pending_cb (loop, l_invoke); ev_set_loop_release_cb (loop, l_release, l_acquire); pthread_create (&u->tid, 0, l_run, loop); } SV * interrupt (SV *loop_) CODE: { struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_)); udat *u = ev_userdata (loop); RETVAL = newSVsv (u->interrupt); } OUTPUT: RETVAL void lock (SV *loop_) ALIAS: lock = 0 unlock = 1 notify = 2 CODE: { struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_)); udat *u = ev_userdata (loop); switch (ix) { case 0: X_LOCK (u->lock); break; case 1: X_UNLOCK (u->lock); break; case 2: ev_async_send (loop, &u->async_w); break; } } void scope_lock (SV *loop_) CODE: { struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_)); udat *u = ev_userdata (loop); X_LOCK (u->lock); LEAVE; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ SAVEDESTRUCTOR_X (scope_lock_cb, (void *)SvREFCNT_inc (SvRV (loop_))); ENTER; /* unfortunately, perl sandwiches XS calls into ENTER/LEAVE */ } void DESTROY (SV *loop_) CODE: { struct ev_loop *loop = (struct ev_loop *)SvIVX (SvRV (loop_)); udat *u = ev_userdata (loop); if (u) { pthread_cancel (u->tid); ev_async_send (loop, &u->async_w); pthread_join (u->tid, 0); X_LOCK (u->lock); ev_async_stop (loop, &u->async_w); pthread_mutex_destroy (&u->lock); pthread_cond_destroy (&u->invoke_cv); pthread_mutex_destroy (&u->invoke_mutex); /* TODO: locked by another thread... */ SvREFCNT_dec (u->interrupt); Safefree (u); } }