--- BDB/BDB.xs 2007/05/09 06:42:24 1.11 +++ BDB/BDB.xs 2007/07/08 11:12:12 1.13 @@ -6,19 +6,31 @@ #include "perl.h" #include "XSUB.h" +// perl stupidly defines these as macros, breaking +// lots and lots of code. +#undef open +#undef close +#undef abort +#undef malloc +#undef free +#undef send + #include #include #include -#include #include #include -#include #include +#ifndef _WIN32 +# include +# include +#endif + #include -#if DB_VERSION_MAJOR < 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR < 5) -# error you need Berkeley DB 4.5 or newer installed +#if DB_VERSION_MAJOR < 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR < 4) +# error you need Berkeley DB 4.4 or newer installed #endif /* number of seconds after which idle threads exit */ @@ -35,13 +47,13 @@ static SV *prepare_cb; -static inline char * +static char * strdup_ornull (const char *s) { return s ? strdup (s) : 0; } -static inline void +static void sv_to_dbt (DBT *dbt, SV *sv) { STRLEN len; @@ -53,7 +65,7 @@ dbt->flags = DB_DBT_REALLOC; } -static inline void +static void dbt_to_sv (SV *sv, DBT *dbt) { if (sv) @@ -128,7 +140,7 @@ static unsigned int started, idle, wanted; /* worker threads management */ -static mutex_t wrklock = MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; typedef struct worker { /* locked by wrklock */ @@ -159,11 +171,11 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe [2], respipe_osf [2]; -static mutex_t reslock = MUTEX_INIT; -static mutex_t reqlock = MUTEX_INIT; -static cond_t reqwait = COND_INIT; +static mutex_t reslock = X_MUTEX_INIT; +static mutex_t reqlock = X_MUTEX_INIT; +static cond_t reqwait = X_COND_INIT; #if WORDACCESS_UNSAFE @@ -171,9 +183,9 @@ { unsigned int retval; - LOCK (reqlock); + X_LOCK (reqlock); retval = nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); return retval; } @@ -182,9 +194,9 @@ { unsigned int retval; - LOCK (reslock); + X_LOCK (reslock); retval = npending; - UNLOCK (reslock); + X_UNLOCK (reslock); return retval; } @@ -193,9 +205,9 @@ { unsigned int retval; - LOCK (wrklock); + X_LOCK (wrklock); retval = started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); return retval; } @@ -340,7 +352,32 @@ Safefree (req); } -static void *aio_proc (void *arg); +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif + +static void +create_pipe (int fd[2]) +{ +#ifdef _WIN32 + int arg = 1; + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd) + || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) +#else + if (pipe (fd) + || fcntl (fd [0], F_SETFL, O_NONBLOCK) + || fcntl (fd [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} + +X_THREAD_PROC (bdb_proc); static void start_thread (void) { @@ -349,8 +386,8 @@ if (!wrk) croak ("unable to allocate worker thread data"); - LOCK (wrklock); - if (thread_create (&wrk->tid, aio_proc, (void *)wrk)) + X_LOCK (wrklock); + if (thread_create (&wrk->tid, bdb_proc, (void *)wrk)) { wrk->prev = &wrk_first; wrk->next = wrk_first.next; @@ -361,7 +398,7 @@ else free (wrk); - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void maybe_start_thread () @@ -383,10 +420,12 @@ // synthesize callback if none given if (!SvOK (req->callback)) { + int count; + dSP; PUSHMARK (SP); PUTBACK; - int count = call_sv (prepare_cb, G_ARRAY); + count = call_sv (prepare_cb, G_ARRAY); SPAGAIN; if (count != 2) @@ -399,11 +438,11 @@ ++nreqs; - LOCK (reqlock); + X_LOCK (reqlock); ++nready; reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); maybe_start_thread (); @@ -426,21 +465,21 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - LOCK (reqlock); + X_LOCK (reqlock); reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); - LOCK (wrklock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void set_max_idle (int nthreads) { - if (WORDACCESS_UNSAFE) LOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } static void min_parallel (int nthreads) @@ -465,19 +504,19 @@ while (nreqs) { int size; - if (WORDACCESS_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (reslock); size = res_queue.size; - if (WORDACCESS_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); if (size) return; maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } @@ -499,7 +538,7 @@ { maybe_start_thread (); - LOCK (reslock); + X_LOCK (reslock); req = reqq_shift (&res_queue); if (req) @@ -510,12 +549,12 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (respipe_read (respipe [0], buf, 4) == 4) ; } } - UNLOCK (reslock); + X_UNLOCK (reslock); if (!req) break; @@ -555,35 +594,22 @@ return count; } -static void create_pipe () -{ - if (pipe (respipe)) - croak ("unable to initialize result pipe"); - - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); - - if (fcntl (respipe [1], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); -} - /*****************************************************************************/ -static void *aio_proc (void *thr_arg) +X_THREAD_PROC (bdb_proc) { aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; /* try to distribute timeouts somewhat evenly */ - ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) - * (1000000000UL / 1024UL); + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = time (0) + IDLE_TIMEOUT; - LOCK (reqlock); + X_LOCK (reqlock); for (;;) { @@ -594,21 +620,21 @@ ++idle; - if (COND_TIMEDWAIT (reqwait, reqlock, ts) + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; - UNLOCK (reqlock); - LOCK (wrklock); + X_UNLOCK (reqlock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ - COND_WAIT (reqwait, reqlock); + X_COND_WAIT (reqwait, reqlock); ts.tv_sec = time (0) + IDLE_TIMEOUT; } @@ -617,7 +643,7 @@ --nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); switch (req->type) { @@ -741,24 +767,24 @@ break; } - LOCK (reslock); + X_LOCK (reslock); ++npending; if (!reqq_push (&res_queue, req)) /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); self->req = 0; worker_clear (self); - UNLOCK (reslock); + X_UNLOCK (reslock); } quit: - LOCK (wrklock); + X_LOCK (wrklock); worker_free (self); - UNLOCK (wrklock); + X_UNLOCK (wrklock); return 0; } @@ -767,16 +793,16 @@ static void atfork_prepare (void) { - LOCK (wrklock); - LOCK (reqlock); - LOCK (reslock); + X_LOCK (wrklock); + X_LOCK (reqlock); + X_LOCK (reslock); } static void atfork_parent (void) { - UNLOCK (reslock); - UNLOCK (reqlock); - UNLOCK (wrklock); + X_UNLOCK (reslock); + X_UNLOCK (reqlock); + X_UNLOCK (wrklock); } static void atfork_child (void) @@ -806,9 +832,10 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); - create_pipe (); + respipe_close (respipe [0]); + respipe_close (respipe [1]); + + create_pipe (respipe); atfork_parent (); } @@ -897,7 +924,6 @@ const_iv (LOG_AUTOREMOVE) const_iv (LOG_INMEMORY) const_iv (NOLOCKING) - const_iv (MULTIVERSION) const_iv (NOMMAP) const_iv (NOPANIC) const_iv (OVERWRITE) @@ -905,7 +931,6 @@ const_iv (REGION_INIT) const_iv (TIME_NOTGRANTED) const_iv (TXN_NOSYNC) - const_iv (TXN_SNAPSHOT) const_iv (TXN_WRITE_NOSYNC) const_iv (WRITECURSOR) const_iv (YIELDCPU) @@ -958,7 +983,6 @@ const_iv (NOOVERWRITE) const_iv (TXN_NOWAIT) - const_iv (TXN_SNAPSHOT) const_iv (TXN_SYNC) const_iv (SET_LOCK_TIMEOUT) @@ -996,13 +1020,25 @@ const_iv (SEQ_DEC) const_iv (SEQ_INC) const_iv (SEQ_WRAP) +#if DB_VERSION_MINOR >= 5 + const_iv (MULTIVERSION) + const_iv (TXN_SNAPSHOT) +#endif }; for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); - create_pipe (); - ATFORK (atfork_prepare, atfork_parent, atfork_child); + create_pipe (respipe); + + X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); +#ifdef _WIN32 + X_MUTEX_CHECK (wrklock); + X_MUTEX_CHECK (reslock); + X_MUTEX_CHECK (reqlock); + + X_COND_CHECK (reqwait); +#endif } void @@ -1131,9 +1167,9 @@ nthreads () PROTOTYPE: CODE: - if (WORDACCESS_UNSAFE) LOCK (wrklock); + if (WORDACCESS_UNSAFE) X_LOCK (wrklock); RETVAL = started; - if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock); OUTPUT: RETVAL @@ -1160,9 +1196,10 @@ db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef) CODE: { + dREQ (REQ_ENV_OPEN); + env->set_thread_count (env, get_nthreads ()); - dREQ (REQ_ENV_OPEN); req->env = env; req->uint1 = open_flags | DB_THREAD; req->int1 = mode;