--- BDB/BDB.xs 2007/02/11 22:38:37 1.9 +++ BDB/BDB.xs 2007/12/04 10:13:50 1.20 @@ -1,12 +1,4 @@ -/* solaris */ -#define _POSIX_PTHREAD_SEMANTICS 1 - -#if __linux && !defined(_GNU_SOURCE) -# define _GNU_SOURCE -#endif - -/* just in case */ -#define _REENTRANT 1 +#include "xthread.h" #include @@ -14,34 +6,36 @@ #include "perl.h" #include "XSUB.h" -#include +// perl stupidly defines these as macros, breaking +// lots and lots of code. +#undef open +#undef close +#undef abort +#undef malloc +#undef free +#undef send #include #include #include -#include #include #include -#include #include +#ifndef _WIN32 +# include +# include +#endif + #include +#if DB_VERSION_MAJOR < 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR < 4) +# error you need Berkeley DB 4.4 or newer installed +#endif + /* number of seconds after which idle threads exit */ #define IDLE_TIMEOUT 10 -/* wether word reads are potentially non-atomic. - * this is conservatice, likely most arches this runs - * on have atomic word read/writes. - */ -#ifndef WORDACCESS_UNSAFE -# if __i386 || __x86_64 -# define WORDACCESS_UNSAFE 0 -# else -# define WORDACCESS_UNSAFE 1 -# endif -#endif - typedef DB_ENV DB_ENV_ornull; typedef DB_TXN DB_TXN_ornull; typedef DBC DBC_ornull; @@ -53,13 +47,35 @@ static SV *prepare_cb; -static inline char * +#if DB_VERSION_MINOR >= 6 +# define c_close close +# define c_count count +# define c_del del +# define c_dup dup +# define c_get get +# define c_pget pget +# define c_put put +#endif + +static void +debug_errcall (const DB_ENV *dbenv, const char *errpfx, const char *msg) +{ + printf ("err[%s]\n", msg); +} + +static void +debug_msgcall (const DB_ENV *dbenv, const char *msg) +{ + printf ("msg[%s]\n", msg); +} + +static char * strdup_ornull (const char *s) { return s ? strdup (s) : 0; } -static inline void +static void sv_to_dbt (DBT *dbt, SV *sv) { STRLEN len; @@ -71,7 +87,7 @@ dbt->flags = DB_DBT_REALLOC; } -static inline void +static void dbt_to_sv (SV *sv, DBT *dbt) { if (sv) @@ -90,7 +106,7 @@ REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, - REQ_TXN_COMMIT, REQ_TXN_ABORT, + REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, }; @@ -145,23 +161,14 @@ static unsigned int started, idle, wanted; -#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) -# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP -#else -# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER -#endif - -#define LOCK(mutex) pthread_mutex_lock (&(mutex)) -#define UNLOCK(mutex) pthread_mutex_unlock (&(mutex)) - /* worker threads management */ -static pthread_mutex_t wrklock = AIO_MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; typedef struct worker { /* locked by wrklock */ struct worker *prev, *next; - pthread_t tid; + thread_t tid; /* locked by reslock, reqlock or wrklock */ aio_req req; /* currently processed request */ @@ -186,11 +193,11 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe_osf [2], respipe [2] = { -1, -1 }; -static pthread_mutex_t reslock = AIO_MUTEX_INIT; -static pthread_mutex_t reqlock = AIO_MUTEX_INIT; -static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; +static mutex_t reslock = X_MUTEX_INIT; +static mutex_t reqlock = X_MUTEX_INIT; +static cond_t reqwait = X_COND_INIT; #if WORDACCESS_UNSAFE @@ -198,9 +205,9 @@ { unsigned int retval; - LOCK (reqlock); + X_LOCK (reqlock); retval = nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); return retval; } @@ -209,9 +216,9 @@ { unsigned int retval; - LOCK (reslock); + X_LOCK (reslock); retval = npending; - UNLOCK (reslock); + X_UNLOCK (reslock); return retval; } @@ -220,9 +227,9 @@ { unsigned int retval; - LOCK (wrklock); + X_LOCK (wrklock); retval = started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); return retval; } @@ -367,30 +374,61 @@ Safefree (req); } -static void *aio_proc (void *arg); +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif -static void start_thread (void) +static void +create_respipe () { - sigset_t fullsigset, oldsigset; - pthread_attr_t attr; + int old_readfd = respipe [0]; - worker *wrk = calloc (1, sizeof (worker)); + if (respipe [1] >= 0) + respipe_close (TO_SOCKET (respipe [1])); - if (!wrk) - croak ("unable to allocate worker thread data"); +#ifdef _WIN32 + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe)) +#else + if (pipe (respipe)) +#endif + croak ("unable to initialize result pipe"); - pthread_attr_init (&attr); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); -#ifdef PTHREAD_SCOPE_PROCESS - pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS); + if (old_readfd >= 0) + { + if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0) + croak ("unable to initialize result pipe(2)"); + + respipe_close (respipe [0]); + respipe [0] = old_readfd; + } + +#ifdef _WIN32 + int arg = 1; + if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) +#else + if (fcntl (respipe [0], F_SETFL, O_NONBLOCK) + || fcntl (respipe [1], F_SETFL, O_NONBLOCK)) #endif + croak ("unable to initialize result pipe(3)"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} - sigfillset (&fullsigset); +X_THREAD_PROC (bdb_proc); - LOCK (wrklock); - pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); +static void start_thread (void) +{ + worker *wrk = calloc (1, sizeof (worker)); + + if (!wrk) + croak ("unable to allocate worker thread data"); - if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) + X_LOCK (wrklock); + if (thread_create (&wrk->tid, bdb_proc, (void *)wrk)) { wrk->prev = &wrk_first; wrk->next = wrk_first.next; @@ -401,8 +439,7 @@ else free (wrk); - pthread_sigmask (SIG_SETMASK, &oldsigset, 0); - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void maybe_start_thread () @@ -424,10 +461,12 @@ // synthesize callback if none given if (!SvOK (req->callback)) { + int count; + dSP; PUSHMARK (SP); PUTBACK; - int count = call_sv (prepare_cb, G_ARRAY); + count = call_sv (prepare_cb, G_ARRAY); SPAGAIN; if (count != 2) @@ -440,11 +479,11 @@ ++nreqs; - LOCK (reqlock); + X_LOCK (reqlock); ++nready; reqq_push (&req_queue, req); - pthread_cond_signal (&reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); maybe_start_thread (); @@ -467,21 +506,21 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - LOCK (reqlock); + X_LOCK (reqlock); reqq_push (&req_queue, req); - pthread_cond_signal (&reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); - LOCK (wrklock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void set_max_idle (int nthreads) { - if (WORDACCESS_UNSAFE) LOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } static void min_parallel (int nthreads) @@ -506,19 +545,19 @@ while (nreqs) { int size; - if (WORDACCESS_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (reslock); size = res_queue.size; - if (WORDACCESS_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); if (size) return; maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } @@ -540,7 +579,7 @@ { maybe_start_thread (); - LOCK (reslock); + X_LOCK (reslock); req = reqq_shift (&res_queue); if (req) @@ -551,12 +590,12 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (respipe_read (respipe [0], buf, 4) == 4) ; } } - UNLOCK (reslock); + X_UNLOCK (reslock); if (!req) break; @@ -596,35 +635,22 @@ return count; } -static void create_pipe () -{ - if (pipe (respipe)) - croak ("unable to initialize result pipe"); - - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); - - if (fcntl (respipe [1], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); -} - /*****************************************************************************/ -static void *aio_proc (void *thr_arg) +X_THREAD_PROC (bdb_proc) { aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; /* try to distribute timeouts somewhat evenly */ - ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) - * (1000000000UL / 1024UL); + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = time (0) + IDLE_TIMEOUT; - LOCK (reqlock); + X_LOCK (reqlock); for (;;) { @@ -635,21 +661,21 @@ ++idle; - if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; - UNLOCK (reqlock); - LOCK (wrklock); + X_UNLOCK (reqlock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ - pthread_cond_wait (&reqwait, &reqlock); + X_COND_WAIT (reqwait, reqlock); ts.tv_sec = time (0) + IDLE_TIMEOUT; } @@ -658,11 +684,12 @@ --nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); switch (req->type) { case REQ_QUIT: + req->result = ENOSYS; goto quit; case REQ_ENV_OPEN: @@ -733,6 +760,17 @@ req->result = req->txn->abort (req->txn); break; + case REQ_TXN_FINISH: + if (req->txn->flags & TXN_DEADLOCK) + { + req->result = req->txn->abort (req->txn); + if (!req->result) + req->result = DB_LOCK_DEADLOCK; + } + else + req->result = req->txn->commit (req->txn, req->uint1); + break; + case REQ_C_CLOSE: req->result = req->dbc->c_close (req->dbc); break; @@ -782,24 +820,27 @@ break; } - LOCK (reslock); + if (req->txn && (req->result > 0 || req->result == DB_LOCK_NOTGRANTED)) + req->txn->flags |= TXN_DEADLOCK; + + X_LOCK (reslock); ++npending; if (!reqq_push (&res_queue, req)) /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); self->req = 0; worker_clear (self); - UNLOCK (reslock); + X_UNLOCK (reslock); } quit: - LOCK (wrklock); + X_LOCK (wrklock); worker_free (self); - UNLOCK (wrklock); + X_UNLOCK (wrklock); return 0; } @@ -808,16 +849,16 @@ static void atfork_prepare (void) { - LOCK (wrklock); - LOCK (reqlock); - LOCK (reslock); + X_LOCK (wrklock); + X_LOCK (reqlock); + X_LOCK (reslock); } static void atfork_parent (void) { - UNLOCK (reslock); - UNLOCK (reqlock); - UNLOCK (wrklock); + X_UNLOCK (reslock); + X_UNLOCK (reqlock); + X_UNLOCK (wrklock); } static void atfork_child (void) @@ -847,9 +888,7 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); - create_pipe (); + create_respipe (); atfork_parent (); } @@ -923,6 +962,7 @@ const_iv (INIT_TXN) const_iv (RECOVER_FATAL) const_iv (CREATE) + const_iv (RDONLY) const_iv (USE_ENVIRON) const_iv (USE_ENVIRON_ROOT) const_iv (LOCKDOWN) @@ -938,7 +978,6 @@ const_iv (LOG_AUTOREMOVE) const_iv (LOG_INMEMORY) const_iv (NOLOCKING) - const_iv (MULTIVERSION) const_iv (NOMMAP) const_iv (NOPANIC) const_iv (OVERWRITE) @@ -946,7 +985,7 @@ const_iv (REGION_INIT) const_iv (TIME_NOTGRANTED) const_iv (TXN_NOSYNC) - const_iv (TXN_SNAPSHOT) + const_iv (TXN_NOT_DURABLE) const_iv (TXN_WRITE_NOSYNC) const_iv (WRITECURSOR) const_iv (YIELDCPU) @@ -964,7 +1003,6 @@ const_iv (NOSYNC) const_iv (CHKSUM) const_iv (ENCRYPT) - const_iv (TXN_NOT_DURABLE) const_iv (DUP) const_iv (DUPSORT) const_iv (RECNUM) @@ -999,7 +1037,6 @@ const_iv (NOOVERWRITE) const_iv (TXN_NOWAIT) - const_iv (TXN_SNAPSHOT) const_iv (TXN_SYNC) const_iv (SET_LOCK_TIMEOUT) @@ -1037,13 +1074,78 @@ const_iv (SEQ_DEC) const_iv (SEQ_INC) const_iv (SEQ_WRAP) + + const_iv (BUFFER_SMALL) + const_iv (DONOTINDEX) + const_iv (KEYEMPTY ) + const_iv (KEYEXIST ) + const_iv (LOCK_DEADLOCK) + const_iv (LOCK_NOTGRANTED) + const_iv (LOG_BUFFER_FULL) + const_iv (NOSERVER) + const_iv (NOSERVER_HOME) + const_iv (NOSERVER_ID) + const_iv (NOTFOUND) + const_iv (OLD_VERSION) + const_iv (PAGE_NOTFOUND) + const_iv (REP_DUPMASTER) + const_iv (REP_HANDLE_DEAD) + const_iv (REP_HOLDELECTION) + const_iv (REP_IGNORE) + const_iv (REP_ISPERM) + const_iv (REP_JOIN_FAILURE) + const_iv (REP_LOCKOUT) + const_iv (REP_NEWMASTER) + const_iv (REP_NEWSITE) + const_iv (REP_NOTPERM) + const_iv (REP_UNAVAIL) + const_iv (RUNRECOVERY) + const_iv (SECONDARY_BAD) + const_iv (VERIFY_BAD) + const_iv (VERSION_MISMATCH) + + const_iv (VERB_DEADLOCK) + const_iv (VERB_RECOVERY) + const_iv (VERB_REGISTER) + const_iv (VERB_REPLICATION) + const_iv (VERB_WAITSFOR) + + const_iv (VERSION_MAJOR) + const_iv (VERSION_MINOR) + const_iv (VERSION_PATCH) +#if DB_VERSION_MINOR >= 5 + const_iv (MULTIVERSION) + const_iv (TXN_SNAPSHOT) +#endif +#if DB_VERSION_MINOR >= 6 + const_iv (PREV_DUP) +# if 0 + const_iv (PRIORITY_UNCHANGED) + const_iv (PRIORITY_VERY_LOW) + const_iv (PRIORITY_LOW) + const_iv (PRIORITY_DEFAULT) + const_iv (PRIORITY_HIGH) + const_iv (PRIORITY_VERY_HIGH) +# endif +#endif }; for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); - create_pipe (); - pthread_atfork (atfork_prepare, atfork_parent, atfork_child); + newCONSTSUB (stash, "DB_VERSION", newSVnv (DB_VERSION_MAJOR + DB_VERSION_MINOR * .1)); + newCONSTSUB (stash, "DB_VERSION_STRING", newSVpv (DB_VERSION_STRING, 0)); + + create_respipe (); + + X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); +#ifdef _WIN32 + X_MUTEX_CHECK (wrklock); + X_MUTEX_CHECK (reslock); + X_MUTEX_CHECK (reqlock); + + X_COND_CHECK (reqwait); +#endif } void @@ -1172,9 +1274,9 @@ nthreads () PROTOTYPE: CODE: - if (WORDACCESS_UNSAFE) LOCK (wrklock); + if (WORDACCESS_UNSAFE) X_LOCK (wrklock); RETVAL = started; - if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock); OUTPUT: RETVAL @@ -1185,6 +1287,13 @@ SvREFCNT_dec (prepare_cb); prepare_cb = newSVsv (cb); +char * +strerror (int errorno = errno) + PROTOTYPE: ;$ + CODE: + RETVAL = db_strerror (errorno); + OUTPUT: + RETVAL DB_ENV * db_env_create (U32 env_flags = 0) @@ -1193,6 +1302,12 @@ errno = db_env_create (&RETVAL, env_flags); if (errno) croak ("db_env_create: %s", db_strerror (errno)); + + if (0) + { + RETVAL->set_errcall (RETVAL, debug_errcall); + RETVAL->set_msgcall (RETVAL, debug_msgcall); + } } OUTPUT: RETVAL @@ -1201,9 +1316,10 @@ db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef) CODE: { - env->set_thread_count (env, get_nthreads ()); - dREQ (REQ_ENV_OPEN); + + env->set_thread_count (env, wanted + 2); + req->env = env; req->uint1 = open_flags | DB_THREAD; req->int1 = mode; @@ -1418,6 +1534,17 @@ } void +db_txn_finish (DB_TXN *txn, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_TXN_FINISH); + req->txn = txn; + req->uint1 = flags; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void db_c_close (DBC *dbc, SV *callback = &PL_sv_undef) CODE: { @@ -1605,13 +1732,27 @@ OUTPUT: RETVAL +void set_errfile (DB_ENV *env, FILE *errfile = 0) + CODE: + env->set_errfile (env, errfile); + +void set_msgfile (DB_ENV *env, FILE *msgfile = 0) + CODE: + env->set_msgfile (env, msgfile); + +int set_verbose (DB_ENV *env, U32 which, int onoff = 1) + CODE: + RETVAL = env->set_verbose (env, which, onoff); + OUTPUT: + RETVAL + int set_encrypt (DB_ENV *env, const char *password, U32 flags = 0) CODE: RETVAL = env->set_encrypt (env, password, flags); OUTPUT: RETVAL -int set_timeout (DB_ENV *env, NV timeout, U32 flags) +int set_timeout (DB_ENV *env, NV timeout, U32 flags = DB_SET_TXN_TIMEOUT) CODE: RETVAL = env->set_timeout (env, timeout * 1000000, flags); OUTPUT: @@ -1698,7 +1839,7 @@ OUTPUT: RETVAL -int set_flags (DB *db, U32 flags); +int set_flags (DB *db, U32 flags) CODE: RETVAL = db->set_flags (db, flags); OUTPUT: @@ -1722,7 +1863,7 @@ OUTPUT: RETVAL -int set_re_delim(DB *db, int delim); +int set_re_delim (DB *db, int delim) CODE: RETVAL = db->set_re_delim (db, delim); OUTPUT: @@ -1793,12 +1934,18 @@ if (txn) txn->abort (txn); -int set_timeout (DB_TXN *txn, NV timeout, U32 flags) +int set_timeout (DB_TXN *txn, NV timeout, U32 flags = DB_SET_TXN_TIMEOUT) CODE: RETVAL = txn->set_timeout (txn, timeout * 1000000, flags); OUTPUT: RETVAL +int failed (DB_TXN *txn) + CODE: + RETVAL = !!(txn->flags & TXN_DEADLOCK); + OUTPUT: + RETVAL + MODULE = BDB PACKAGE = BDB::Cursor