#define PERL_NO_GET_CONTEXT #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #define X_STACKSIZE -1 #include "CoroAPI.h" #include "perlmulticore.h" #include "schmorp.h" #include "xthread.h" #ifdef _WIN32 typedef char sigset_t; #define pthread_sigmask(mode,new,old) #endif static pthread_key_t current_key; static s_epipe ep; static void *perl_thx; static sigset_t cursigset, fullsigset; struct tctx { void *coro; }; static struct tctx *tctx_free; static int available; static int max_idle = 8; static xmutex_t perl_m = X_MUTEX_INIT; static xcond_t perl_c = X_COND_INIT; static struct tctx *perl_f; static xmutex_t wait_m = X_MUTEX_INIT; static xcond_t wait_c = X_COND_INIT; static int wait_f; static int wakeup_f; static struct tctx **waiters; static int waiters_count, waiters_max; static struct tctx * tctx_get (void) { struct tctx *ctx; if (!tctx_free) ctx = malloc (sizeof (*tctx_free)); else { ctx = tctx_free; tctx_free = tctx_free->coro; } return ctx; } static void tctx_put (struct tctx *ctx) { ctx->coro = tctx_free; tctx_free = ctx; } X_THREAD_PROC(thread_proc) { PERL_SET_CONTEXT (perl_thx); { dTHX; /* inefficient, we already have perl_thx, but I see no better way */ struct tctx *ctx; for (;;) { /* TODO: should really use some idle time and exit after that */ X_LOCK (perl_m); while (!perl_f) X_COND_WAIT (perl_c, perl_m); ctx = perl_f; perl_f = 0; --available; pthread_sigmask (SIG_SETMASK, &cursigset, 0); X_UNLOCK (perl_m); while (ctx->coro) CORO_SCHEDULE; X_LOCK (wait_m); pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); wait_f = 1; X_COND_SIGNAL (wait_c); if (available >= max_idle) { X_UNLOCK (wait_m); break; } ++available; X_UNLOCK (wait_m); } } } static void start_thread (void) { xthread_t tid; ++available; xthread_create (&tid, thread_proc, 0); } static void pmapi_release (void) { struct tctx *ctx = tctx_get (); ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT); pthread_setspecific (current_key, ctx); pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); if (!available) start_thread (); X_LOCK (perl_m); perl_f = ctx; X_COND_SIGNAL (perl_c); X_UNLOCK (perl_m); } static void pmapi_acquire (void) { struct tctx *ctx = pthread_getspecific (current_key); X_LOCK (wait_m); if (waiters_count >= waiters_max) { waiters_max = waiters_max ? waiters_max * 2 : 16; waiters = realloc (waiters, waiters_max * sizeof (*waiters)); } waiters [waiters_count++] = ctx; s_epipe_signal (&ep); while (!wait_f) X_COND_WAIT (wait_c, wait_m); wait_f = 0; X_UNLOCK (wait_m); tctx_put (ctx); pthread_sigmask (SIG_SETMASK, &cursigset, 0); } MODULE = Coro::Multicore PACKAGE = Coro::Multicore PROTOTYPES: DISABLE BOOT: { #ifndef _WIN32 sigfillset (&fullsigset); #endif pthread_key_create (¤t_key, 0); if (s_epipe_new (&ep)) croak ("Coro::Multicore: unable to initialise event pipe.\n"); perl_thx = PERL_GET_CONTEXT; I_CORO_API ("Coro::Multicore"); /* not perfectly efficient to do it this way, but it's simple */ perl_multicore_init (); perl_multicore_api->pmapi_release = pmapi_release; perl_multicore_api->pmapi_acquire = pmapi_acquire; } U32 max_idle_threads (U32 max = NO_INIT) CODE: X_LOCK (wait_m); RETVAL = max_idle; if (items) max_idle = max; X_UNLOCK (wait_m); OUTPUT: RETVAL int fd () CODE: RETVAL = s_epipe_fd (&ep); OUTPUT: RETVAL void poll (...) CODE: s_epipe_drain (&ep); X_LOCK (wait_m); while (waiters_count) { struct tctx *ctx = waiters [--waiters_count]; CORO_READY ((SV *)ctx->coro); SvREFCNT_dec_NN ((SV *)ctx->coro); ctx->coro = 0; } X_UNLOCK (wait_m); void sleep (NV seconds) CODE: perlinterp_release (); usleep (seconds * 1e6); perlinterp_acquire ();