--- BDB/BDB.xs 2007/02/05 20:21:38 1.2 +++ BDB/BDB.xs 2007/07/08 11:12:12 1.13 @@ -1,12 +1,4 @@ -/* solaris */ -#define _POSIX_PTHREAD_SEMANTICS 1 - -#if __linux && !defined(_GNU_SOURCE) -# define _GNU_SOURCE -#endif - -/* just in case */ -#define _REENTRANT 1 +#include "xthread.h" #include @@ -14,56 +6,110 @@ #include "perl.h" #include "XSUB.h" -#include +// perl stupidly defines these as macros, breaking +// lots and lots of code. +#undef open +#undef close +#undef abort +#undef malloc +#undef free +#undef send #include #include #include -#include #include #include -#include #include +#ifndef _WIN32 +# include +# include +#endif + #include +#if DB_VERSION_MAJOR < 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR < 4) +# error you need Berkeley DB 4.4 or newer installed +#endif + /* number of seconds after which idle threads exit */ #define IDLE_TIMEOUT 10 -/* wether word reads are potentially non-atomic. - * this is conservatice, likely most arches this runs - * on have atomic word read/writes. - */ -#ifndef WORDACCESS_UNSAFE -# if __i386 || __x86_64 -# define WORDACCESS_UNSAFE 0 -# else -# define WORDACCESS_UNSAFE 1 -# endif -#endif +typedef DB_ENV DB_ENV_ornull; +typedef DB_TXN DB_TXN_ornull; +typedef DBC DBC_ornull; +typedef DB DB_ornull; +typedef DB_SEQUENCE DB_SEQUENCE_ornull; typedef SV SV8; /* byte-sv, used for argument-checking */ +typedef char *octetstring; + +static SV *prepare_cb; + +static char * +strdup_ornull (const char *s) +{ + return s ? strdup (s) : 0; +} -enum +static void +sv_to_dbt (DBT *dbt, SV *sv) { + STRLEN len; + char *data = SvPVbyte (sv, len); + + dbt->data = malloc (len); + memcpy (dbt->data, data, len); + dbt->size = len; + dbt->flags = DB_DBT_REALLOC; +} + +static void +dbt_to_sv (SV *sv, DBT *dbt) +{ + if (sv) + { + SvREADONLY_off (sv); + sv_setpvn_mg (sv, dbt->data, dbt->size); + SvREFCNT_dec (sv); + } + + free (dbt->data); +} + +enum { REQ_QUIT, - REQ_ENV_OPEN, REQ_ENV_CLOSE, - REQ_DB_OPEN, REQ_DB_CLOSE, + REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT, + REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, + REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, + REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, + REQ_TXN_COMMIT, REQ_TXN_ABORT, + REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, + REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, }; typedef struct aio_cb { struct aio_cb *volatile next; SV *callback; - int type, pri, errorno; + int type, pri, result; DB_ENV *env; DB *db; DB_TXN *txn; - DBC *cursor; + DBC *dbc; + + UV uv1; int int1, int2; U32 uint1, uint2; char *buf1, *buf2; + SV *sv1, *sv2, *sv3; + + DBT dbt1, dbt2, dbt3; + DB_KEY_RANGE key_range; + DB_SEQUENCE *seq; + db_seq_t seq_t; } aio_cb; typedef aio_cb *aio_req; @@ -93,23 +139,14 @@ static unsigned int started, idle, wanted; -#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) -# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP -#else -# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER -#endif - -#define LOCK(mutex) pthread_mutex_lock (&(mutex)) -#define UNLOCK(mutex) pthread_mutex_unlock (&(mutex)) - /* worker threads management */ -static pthread_mutex_t wrklock = AIO_MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; typedef struct worker { /* locked by wrklock */ struct worker *prev, *next; - pthread_t tid; + thread_t tid; /* locked by reslock, reqlock or wrklock */ aio_req req; /* currently processed request */ @@ -134,11 +171,11 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe [2], respipe_osf [2]; -static pthread_mutex_t reslock = AIO_MUTEX_INIT; -static pthread_mutex_t reqlock = AIO_MUTEX_INIT; -static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; +static mutex_t reslock = X_MUTEX_INIT; +static mutex_t reqlock = X_MUTEX_INIT; +static cond_t reqwait = X_COND_INIT; #if WORDACCESS_UNSAFE @@ -146,9 +183,9 @@ { unsigned int retval; - LOCK (reqlock); + X_LOCK (reqlock); retval = nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); return retval; } @@ -157,9 +194,9 @@ { unsigned int retval; - LOCK (reslock); + X_LOCK (reslock); retval = npending; - UNLOCK (reslock); + X_UNLOCK (reslock); return retval; } @@ -168,9 +205,9 @@ { unsigned int retval; - LOCK (wrklock); + X_LOCK (wrklock); retval = started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); return retval; } @@ -250,12 +287,53 @@ ENTER; SAVETMPS; PUSHMARK (SP); - EXTEND (SP, 1); switch (req->type) { + case REQ_DB_CLOSE: + SvREFCNT_dec (req->sv1); + break; + + case REQ_DB_GET: + case REQ_DB_PGET: + dbt_to_sv (req->sv3, &req->dbt3); + break; + + case REQ_C_GET: + case REQ_C_PGET: + dbt_to_sv (req->sv1, &req->dbt1); + dbt_to_sv (req->sv2, &req->dbt2); + dbt_to_sv (req->sv3, &req->dbt3); + break; + + case REQ_DB_KEY_RANGE: + { + AV *av = newAV (); + + av_push (av, newSVnv (req->key_range.less)); + av_push (av, newSVnv (req->key_range.equal)); + av_push (av, newSVnv (req->key_range.greater)); + + SvREADONLY_off (req->sv1); + sv_setsv_mg (req->sv1, newRV_noinc ((SV *)av)); + SvREFCNT_dec (req->sv1); + } + break; + + case REQ_SEQ_GET: + SvREADONLY_off (req->sv1); + + if (sizeof (IV) > 4) + sv_setiv_mg (req->sv1, req->seq_t); + else + sv_setnv_mg (req->sv1, req->seq_t); + + SvREFCNT_dec (req->sv1); + break; } + errno = req->result; + PUTBACK; call_sv (req->callback, G_VOID | G_EVAL); SPAGAIN; @@ -274,30 +352,42 @@ Safefree (req); } -static void *aio_proc(void *arg); +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif -static void start_thread (void) +static void +create_pipe (int fd[2]) { - sigset_t fullsigset, oldsigset; - pthread_attr_t attr; +#ifdef _WIN32 + int arg = 1; + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd) + || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) +#else + if (pipe (fd) + || fcntl (fd [0], F_SETFL, O_NONBLOCK) + || fcntl (fd [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} + +X_THREAD_PROC (bdb_proc); +static void start_thread (void) +{ worker *wrk = calloc (1, sizeof (worker)); if (!wrk) croak ("unable to allocate worker thread data"); - pthread_attr_init (&attr); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); -#ifdef PTHREAD_SCOPE_PROCESS - pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS); -#endif - - sigfillset (&fullsigset); - - LOCK (wrklock); - pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); - - if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) + X_LOCK (wrklock); + if (thread_create (&wrk->tid, bdb_proc, (void *)wrk)) { wrk->prev = &wrk_first; wrk->next = wrk_first.next; @@ -308,8 +398,7 @@ else free (wrk); - pthread_sigmask (SIG_SETMASK, &oldsigset, 0); - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void maybe_start_thread () @@ -326,15 +415,45 @@ static void req_send (aio_req req) { + SV *wait_callback = 0; + + // synthesize callback if none given + if (!SvOK (req->callback)) + { + int count; + + dSP; + PUSHMARK (SP); + PUTBACK; + count = call_sv (prepare_cb, G_ARRAY); + SPAGAIN; + + if (count != 2) + croak ("prepare callback must return exactly two values\n"); + + wait_callback = SvREFCNT_inc (POPs); + SvREFCNT_dec (req->callback); + req->callback = SvREFCNT_inc (POPs); + } + ++nreqs; - LOCK (reqlock); + X_LOCK (reqlock); ++nready; reqq_push (&req_queue, req); - pthread_cond_signal (&reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); maybe_start_thread (); + + if (wait_callback) + { + dSP; + PUSHMARK (SP); + PUTBACK; + call_sv (wait_callback, G_DISCARD); + SvREFCNT_dec (wait_callback); + } } static void end_thread (void) @@ -346,21 +465,21 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - LOCK (reqlock); + X_LOCK (reqlock); reqq_push (&req_queue, req); - pthread_cond_signal (&reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); - LOCK (wrklock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void set_max_idle (int nthreads) { - if (WORDACCESS_UNSAFE) LOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } static void min_parallel (int nthreads) @@ -385,19 +504,19 @@ while (nreqs) { int size; - if (WORDACCESS_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (reslock); size = res_queue.size; - if (WORDACCESS_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); if (size) return; maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } @@ -419,7 +538,7 @@ { maybe_start_thread (); - LOCK (reslock); + X_LOCK (reslock); req = reqq_shift (&res_queue); if (req) @@ -430,12 +549,12 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (respipe_read (respipe [0], buf, 4) == 4) ; } } - UNLOCK (reslock); + X_UNLOCK (reslock); if (!req) break; @@ -475,35 +594,22 @@ return count; } -static void create_pipe () -{ - if (pipe (respipe)) - croak ("unable to initialize result pipe"); - - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); - - if (fcntl (respipe [1], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); -} - /*****************************************************************************/ -static void *aio_proc (void *thr_arg) +X_THREAD_PROC (bdb_proc) { aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; /* try to distribute timeouts somewhat evenly */ - ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) - * (1000000000UL / 1024UL); + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = time (0) + IDLE_TIMEOUT; - LOCK (reqlock); + X_LOCK (reqlock); for (;;) { @@ -514,21 +620,21 @@ ++idle; - if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; - UNLOCK (reqlock); - LOCK (wrklock); + X_UNLOCK (reqlock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ - pthread_cond_wait (&reqwait, &reqlock); + X_COND_WAIT (reqwait, reqlock); ts.tv_sec = time (0) + IDLE_TIMEOUT; } @@ -537,40 +643,148 @@ --nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); - errno = 0; /* strictly unnecessary */ - switch (req->type) { case REQ_QUIT: goto quit; + case REQ_ENV_OPEN: + req->result = req->env->open (req->env, req->buf1, req->uint1, req->int1); + break; + + case REQ_ENV_CLOSE: + req->result = req->env->close (req->env, req->uint1); + break; + + case REQ_ENV_TXN_CHECKPOINT: + req->result = req->env->txn_checkpoint (req->env, req->uint1, req->int1, req->uint2); + break; + + case REQ_ENV_LOCK_DETECT: + req->result = req->env->lock_detect (req->env, req->uint1, req->uint2, &req->int1); + break; + + case REQ_ENV_MEMP_SYNC: + req->result = req->env->memp_sync (req->env, 0); + break; + + case REQ_ENV_MEMP_TRICKLE: + req->result = req->env->memp_trickle (req->env, req->int1, &req->int2); + break; + + case REQ_DB_OPEN: + req->result = req->db->open (req->db, req->txn, req->buf1, req->buf2, req->int1, req->uint1, req->int2); + break; + + case REQ_DB_CLOSE: + req->result = req->db->close (req->db, req->uint1); + break; + + case REQ_DB_COMPACT: + req->result = req->db->compact (req->db, req->txn, &req->dbt1, &req->dbt2, 0, req->uint1, 0); + break; + + case REQ_DB_SYNC: + req->result = req->db->sync (req->db, req->uint1); + break; + + case REQ_DB_PUT: + req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1); + break; + + case REQ_DB_GET: + req->result = req->db->get (req->db, req->txn, &req->dbt1, &req->dbt3, req->uint1); + break; + + case REQ_DB_PGET: + req->result = req->db->pget (req->db, req->txn, &req->dbt1, &req->dbt2, &req->dbt3, req->uint1); + break; + + case REQ_DB_DEL: + req->result = req->db->del (req->db, req->txn, &req->dbt1, req->uint1); + break; + + case REQ_DB_KEY_RANGE: + req->result = req->db->key_range (req->db, req->txn, &req->dbt1, &req->key_range, req->uint1); + break; + + case REQ_TXN_COMMIT: + req->result = req->txn->commit (req->txn, req->uint1); + break; + + case REQ_TXN_ABORT: + req->result = req->txn->abort (req->txn); + break; + + case REQ_C_CLOSE: + req->result = req->dbc->c_close (req->dbc); + break; + + case REQ_C_COUNT: + { + db_recno_t recno; + req->result = req->dbc->c_count (req->dbc, &recno, req->uint1); + req->uv1 = recno; + } + break; + + case REQ_C_PUT: + req->result = req->dbc->c_put (req->dbc, &req->dbt1, &req->dbt2, req->uint1); + break; + + case REQ_C_GET: + req->result = req->dbc->c_get (req->dbc, &req->dbt1, &req->dbt3, req->uint1); + break; + + case REQ_C_PGET: + req->result = req->dbc->c_pget (req->dbc, &req->dbt1, &req->dbt2, &req->dbt3, req->uint1); + break; + + case REQ_C_DEL: + req->result = req->dbc->c_del (req->dbc, req->uint1); + break; + + case REQ_SEQ_OPEN: + req->result = req->seq->open (req->seq, req->txn, &req->dbt1, req->uint1); + break; + + case REQ_SEQ_CLOSE: + req->result = req->seq->close (req->seq, req->uint1); + break; + + case REQ_SEQ_GET: + req->result = req->seq->get (req->seq, req->txn, req->int1, &req->seq_t, req->uint1); + break; + + case REQ_SEQ_REMOVE: + req->result = req->seq->remove (req->seq, req->txn, req->uint1); + break; + default: - //req->result = ENOSYS; + req->result = ENOSYS; break; } - //req->errorno = errno; - - LOCK (reslock); + X_LOCK (reslock); ++npending; if (!reqq_push (&res_queue, req)) /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); self->req = 0; worker_clear (self); - UNLOCK (reslock); + X_UNLOCK (reslock); } quit: - LOCK (wrklock); + X_LOCK (wrklock); worker_free (self); - UNLOCK (wrklock); + X_UNLOCK (wrklock); return 0; } @@ -579,16 +793,16 @@ static void atfork_prepare (void) { - LOCK (wrklock); - LOCK (reqlock); - LOCK (reslock); + X_LOCK (wrklock); + X_LOCK (reqlock); + X_LOCK (reslock); } static void atfork_parent (void) { - UNLOCK (reslock); - UNLOCK (reqlock); - UNLOCK (wrklock); + X_UNLOCK (reslock); + X_UNLOCK (reqlock); + X_UNLOCK (wrklock); } static void atfork_child (void) @@ -618,9 +832,10 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); - create_pipe (); + respipe_close (respipe [0]); + respipe_close (respipe [1]); + + create_pipe (respipe); atfork_parent (); } @@ -644,17 +859,32 @@ #define REQ_SEND \ req_send (req) -#define SvPTR(var, arg, type, class) \ - if (!SvOK (arg)) \ - (var) = 0; \ - else if (sv_derived_from ((arg), # class)) \ - { \ - IV tmp = SvIV ((SV*) SvRV (arg)); \ - (var) = INT2PTR (type, tmp); \ - } \ - else \ - Perl_croak (# var " is not of type " # type) - +#define SvPTR(var, arg, type, class, nullok) \ + if (!SvOK (arg)) \ + { \ + if (!nullok) \ + croak (# var " must be a " # class " object, not undef"); \ + \ + (var) = 0; \ + } \ + else if (sv_derived_from ((arg), # class)) \ + { \ + IV tmp = SvIV ((SV*) SvRV (arg)); \ + (var) = INT2PTR (type, tmp); \ + if (!var) \ + croak (# var " is not a valid " # class " object anymore"); \ + } \ + else \ + croak (# var " is not of type " # class); \ + \ + +static void +ptr_nuke (SV *sv) +{ + assert (SvROK (sv)); + sv_setiv (SvRV (sv), 0); +} + MODULE = BDB PACKAGE = BDB PROTOTYPES: ENABLE @@ -694,7 +924,6 @@ const_iv (LOG_AUTOREMOVE) const_iv (LOG_INMEMORY) const_iv (NOLOCKING) - const_iv (MULTIVERSION) const_iv (NOMMAP) const_iv (NOPANIC) const_iv (OVERWRITE) @@ -702,8 +931,8 @@ const_iv (REGION_INIT) const_iv (TIME_NOTGRANTED) const_iv (TXN_NOSYNC) - const_iv (TXN_SNAPSHOT) const_iv (TXN_WRITE_NOSYNC) + const_iv (WRITECURSOR) const_iv (YIELDCPU) const_iv (ENCRYPT_AES) const_iv (XA_CREATE) @@ -713,6 +942,7 @@ const_iv (RECNO) const_iv (UNKNOWN) const_iv (EXCL) + const_iv (READ_COMMITTED) const_iv (READ_UNCOMMITTED) const_iv (TRUNCATE) const_iv (NOSYNC) @@ -727,6 +957,10 @@ const_iv (INORDER) const_iv (CONSUME) const_iv (CONSUME_WAIT) + const_iv (GET_BOTH) + const_iv (GET_BOTH_RANGE) + //const_iv (SET_RECNO) + //const_iv (MULTIPLE) const_iv (SNAPSHOT) const_iv (JOIN_ITEM) const_iv (RMW) @@ -736,13 +970,75 @@ const_iv (LOCK_DEADLOCK) const_iv (LOCK_NOTGRANTED) const_iv (RUNRECOVERY) + const_iv (OLD_VERSION) + const_iv (REP_HANDLE_DEAD) + const_iv (REP_LOCKOUT) + const_iv (SECONDARY_BAD) + + const_iv (FREE_SPACE) + const_iv (FREELIST_ONLY) + + const_iv (APPEND) + const_iv (NODUPDATA) + const_iv (NOOVERWRITE) + + const_iv (TXN_NOWAIT) + const_iv (TXN_SYNC) + + const_iv (SET_LOCK_TIMEOUT) + const_iv (SET_TXN_TIMEOUT) + + const_iv (JOIN_ITEM) + const_iv (FIRST) + const_iv (NEXT) + const_iv (NEXT_DUP) + const_iv (NEXT_NODUP) + const_iv (PREV) + const_iv (PREV_NODUP) + const_iv (SET) + const_iv (SET_RANGE) + const_iv (LAST) + const_iv (BEFORE) + const_iv (AFTER) + const_iv (CURRENT) + const_iv (KEYFIRST) + const_iv (KEYLAST) + const_iv (NODUPDATA) + + const_iv (FORCE) + + const_iv (LOCK_DEFAULT) + const_iv (LOCK_EXPIRE) + const_iv (LOCK_MAXLOCKS) + const_iv (LOCK_MAXWRITE) + const_iv (LOCK_MINLOCKS) + const_iv (LOCK_MINWRITE) + const_iv (LOCK_OLDEST) + const_iv (LOCK_RANDOM) + const_iv (LOCK_YOUNGEST) + + const_iv (SEQ_DEC) + const_iv (SEQ_INC) + const_iv (SEQ_WRAP) +#if DB_VERSION_MINOR >= 5 + const_iv (MULTIVERSION) + const_iv (TXN_SNAPSHOT) +#endif }; for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); - create_pipe (); - pthread_atfork (atfork_prepare, atfork_parent, atfork_child); + create_pipe (respipe); + + X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); +#ifdef _WIN32 + X_MUTEX_CHECK (wrklock); + X_MUTEX_CHECK (reslock); + X_MUTEX_CHECK (reqlock); + + X_COND_CHECK (reqwait); +#endif } void @@ -781,7 +1077,7 @@ RETVAL int -bdbreq_pri (int pri = 0) +dbreq_pri (int pri = 0) PROTOTYPE: ;$ CODE: RETVAL = next_pri - PRI_BIAS; @@ -795,7 +1091,7 @@ RETVAL void -bdbreq_nice (int nice = 0) +dbreq_nice (int nice = 0) CODE: nice = next_pri - nice; if (nice < PRI_MIN) nice = PRI_MIN; @@ -813,7 +1109,7 @@ } int -poll() +poll () PROTOTYPE: CODE: poll_wait (); @@ -822,7 +1118,7 @@ RETVAL int -poll_fileno() +poll_fileno () PROTOTYPE: CODE: RETVAL = respipe [0]; @@ -830,7 +1126,7 @@ RETVAL int -poll_cb(...) +poll_cb (...) PROTOTYPE: CODE: RETVAL = poll_cb (); @@ -838,13 +1134,13 @@ RETVAL void -poll_wait() +poll_wait () PROTOTYPE: CODE: poll_wait (); int -nreqs() +nreqs () PROTOTYPE: CODE: RETVAL = nreqs; @@ -852,7 +1148,7 @@ RETVAL int -nready() +nready () PROTOTYPE: CODE: RETVAL = get_nready (); @@ -860,7 +1156,7 @@ RETVAL int -npending() +npending () PROTOTYPE: CODE: RETVAL = get_npending (); @@ -868,114 +1164,675 @@ RETVAL int -nthreads() +nthreads () PROTOTYPE: CODE: - if (WORDACCESS_UNSAFE) LOCK (wrklock); + if (WORDACCESS_UNSAFE) X_LOCK (wrklock); RETVAL = started; - if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock); OUTPUT: RETVAL +void +set_sync_prepare (SV *cb) + PROTOTYPE: & + CODE: + SvREFCNT_dec (prepare_cb); + prepare_cb = newSVsv (cb); + + DB_ENV * -bdb_env_create (U32 env_flags = 0) +db_env_create (U32 env_flags = 0) CODE: { - int err = db_env_create (&RETVAL, env_flags); - if (err) - croak ("db_env_create: %s", db_strerror (err)); + errno = db_env_create (&RETVAL, env_flags); + if (errno) + croak ("db_env_create: %s", db_strerror (errno)); } + OUTPUT: + RETVAL void -bdb_env_open (DB_ENV *env, char *db_home, U32 open_flags, int mode, SV *callback = 0) +db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef) CODE: { dREQ (REQ_ENV_OPEN); + + env->set_thread_count (env, get_nthreads ()); + req->env = env; - req->uint1 = open_flags; + req->uint1 = open_flags | DB_THREAD; req->int1 = mode; - req->buf1 = strdup (db_home); + req->buf1 = strdup_ornull (db_home); REQ_SEND; } void -bdb_env_close (DB_ENV *env, U32 flags = 0, SV *callback = 0) +db_env_close (DB_ENV *env, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: { dREQ (REQ_ENV_CLOSE); req->env = env; req->uint1 = flags; REQ_SEND; + ptr_nuke (ST (0)); +} + +void +db_env_txn_checkpoint (DB_ENV *env, U32 kbyte = 0, U32 min = 0, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_ENV_TXN_CHECKPOINT); + req->env = env; + req->uint1 = kbyte; + req->int1 = min; + req->uint2 = flags; + REQ_SEND; } +void +db_env_lock_detect (DB_ENV *env, U32 flags = 0, U32 atype = DB_LOCK_DEFAULT, SV *dummy = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_ENV_LOCK_DETECT); + req->env = env; + req->uint1 = flags; + req->uint2 = atype; + REQ_SEND; +} + +void +db_env_memp_sync (DB_ENV *env, SV *dummy = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_ENV_MEMP_SYNC); + req->env = env; + REQ_SEND; +} + +void +db_env_memp_trickle (DB_ENV *env, int percent, SV *dummy = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_ENV_MEMP_TRICKLE); + req->env = env; + req->int1 = percent; + REQ_SEND; +} + + DB * -bdb_db_create (DB_ENV *env = 0, U32 flags = 0) +db_create (DB_ENV *env = 0, U32 flags = 0) CODE: { - int err = db_create (&RETVAL, env, flags); - if (err) - croak ("db_env_create: %s", db_strerror (err)); + errno = db_create (&RETVAL, env, flags); + if (errno) + croak ("db_create: %s", db_strerror (errno)); + + if (RETVAL) + RETVAL->app_private = (void *)newSVsv (ST (0)); } + OUTPUT: + RETVAL void -bdb_db_open (DB *db, DB_TXN *txnid, const char *file, const char *database, int type, U32 flags, int mode, SV *callback = 0) +db_open (DB *db, DB_TXN_ornull *txnid, octetstring file, octetstring database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef) CODE: { dREQ (REQ_DB_OPEN); req->db = db; req->txn = txnid; - req->buf1 = strdup (file); - req->buf2 = strdup (database); + req->buf1 = strdup_ornull (file); + req->buf2 = strdup_ornull (database); req->int1 = type; - req->uint1 = flags; + req->uint1 = flags | DB_THREAD; req->int2 = mode; REQ_SEND; } void -bdb_db_close (DB *db, U32 flags = 0, SV *callback = 0) +db_close (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: { dREQ (REQ_DB_CLOSE); req->db = db; req->uint1 = flags; + req->sv1 = (SV *)db->app_private; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void +db_compact (DB *db, DB_TXN_ornull *txn = 0, SV *start = 0, SV *stop = 0, SV *unused1 = 0, U32 flags = DB_FREE_SPACE, SV *unused2 = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_COMPACT); + req->db = db; + req->txn = txn; + sv_to_dbt (&req->dbt1, start); + sv_to_dbt (&req->dbt2, stop); + req->uint1 = flags; + REQ_SEND; +} + +void +db_sync (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_SYNC); + req->db = db; + req->uint1 = flags; + REQ_SEND; +} + +void +db_key_range (DB *db, DB_TXN_ornull *txn, SV *key, SV *key_range, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_KEY_RANGE); + req->db = db; + req->txn = txn; + sv_to_dbt (&req->dbt1, key); + req->uint1 = flags; + req->sv1 = SvREFCNT_inc (key_range); SvREADONLY_on (key_range); + REQ_SEND; +} + +void +db_put (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_PUT); + req->db = db; + req->txn = txn; + sv_to_dbt (&req->dbt1, key); + sv_to_dbt (&req->dbt2, data); + req->uint1 = flags; + REQ_SEND; +} + +void +db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_GET); + req->db = db; + req->txn = txn; + req->uint1 = flags; + sv_to_dbt (&req->dbt1, key); + req->dbt3.flags = DB_DBT_MALLOC; + req->sv3 = SvREFCNT_inc (data); SvREADONLY_on (data); + REQ_SEND; +} + +void +db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_PGET); + req->db = db; + req->txn = txn; + req->uint1 = flags; + sv_to_dbt (&req->dbt1, key); + sv_to_dbt (&req->dbt2, pkey); + req->dbt3.flags = DB_DBT_MALLOC; + req->sv3 = SvREFCNT_inc (data); SvREADONLY_on (data); + REQ_SEND; +} + +void +db_del (DB *db, DB_TXN_ornull *txn, SV *key, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_DEL); + req->db = db; + req->txn = txn; + req->uint1 = flags; + sv_to_dbt (&req->dbt1, key); + REQ_SEND; +} + +void +db_txn_commit (DB_TXN *txn, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_TXN_COMMIT); + req->txn = txn; + req->uint1 = flags; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void +db_txn_abort (DB_TXN *txn, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_TXN_ABORT); + req->txn = txn; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void +db_c_close (DBC *dbc, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_C_CLOSE); + req->dbc = dbc; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void +db_c_count (DBC *dbc, SV *count, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_C_COUNT); + req->dbc = dbc; + req->sv1 = SvREFCNT_inc (count); + REQ_SEND; +} + +void +db_c_put (DBC *dbc, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_C_PUT); + req->dbc = dbc; + sv_to_dbt (&req->dbt1, key); + sv_to_dbt (&req->dbt2, data); + req->uint1 = flags; + REQ_SEND; +} + +void +db_c_get (DBC *dbc, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_C_GET); + req->dbc = dbc; + req->uint1 = flags; + if ((flags & DB_SET) == DB_SET + || (flags & DB_SET_RANGE) == DB_SET_RANGE) + sv_to_dbt (&req->dbt1, key); + else + req->dbt1.flags = DB_DBT_MALLOC; + + req->sv1 = SvREFCNT_inc (key); SvREADONLY_on (key); + + if ((flags & DB_GET_BOTH) == DB_GET_BOTH + || (flags & DB_GET_BOTH_RANGE) == DB_GET_BOTH_RANGE) + sv_to_dbt (&req->dbt3, data); + else + req->dbt3.flags = DB_DBT_MALLOC; + + req->sv3 = SvREFCNT_inc (data); SvREADONLY_on (data); + REQ_SEND; +} + +void +db_c_pget (DBC *dbc, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_C_PGET); + req->dbc = dbc; + req->uint1 = flags; + if ((flags & DB_SET) == DB_SET + || (flags & DB_SET_RANGE) == DB_SET_RANGE) + sv_to_dbt (&req->dbt1, key); + else + req->dbt1.flags = DB_DBT_MALLOC; + + req->sv1 = SvREFCNT_inc (key); SvREADONLY_on (key); + + req->dbt2.flags = DB_DBT_MALLOC; + req->sv2 = SvREFCNT_inc (pkey); SvREADONLY_on (pkey); + + if ((flags & DB_GET_BOTH) == DB_GET_BOTH + || (flags & DB_GET_BOTH_RANGE) == DB_GET_BOTH_RANGE) + sv_to_dbt (&req->dbt3, data); + else + req->dbt3.flags = DB_DBT_MALLOC; + + req->sv3 = SvREFCNT_inc (data); SvREADONLY_on (data); + REQ_SEND; +} + +void +db_c_del (DBC *dbc, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_C_DEL); + req->dbc = dbc; + req->uint1 = flags; + REQ_SEND; +} + + +void +db_sequence_open (DB_SEQUENCE *seq, DB_TXN_ornull *txnid, SV *key, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_SEQ_OPEN); + req->seq = seq; + req->txn = txnid; + req->uint1 = flags | DB_THREAD; + sv_to_dbt (&req->dbt1, key); + REQ_SEND; +} + +void +db_sequence_close (DB_SEQUENCE *seq, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_SEQ_CLOSE); + req->seq = seq; + req->uint1 = flags; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void +db_sequence_get (DB_SEQUENCE *seq, DB_TXN_ornull *txnid, int delta, SV *seq_value, U32 flags = DB_TXN_NOSYNC, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_SEQ_GET); + req->seq = seq; + req->txn = txnid; + req->int1 = delta; + req->uint1 = flags; + req->sv1 = SvREFCNT_inc (seq_value); SvREADONLY_on (seq_value); + REQ_SEND; +} + +void +db_sequence_remove (DB_SEQUENCE *seq, DB_TXN_ornull *txnid = 0, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_SEQ_REMOVE); + req->seq = seq; + req->txn = txnid; + req->uint1 = flags; REQ_SEND; } MODULE = BDB PACKAGE = BDB::Env +void +DESTROY (DB_ENV_ornull *env) + CODE: + if (env) + env->close (env, 0); + +int set_data_dir (DB_ENV *env, const char *dir) + CODE: + RETVAL = env->set_data_dir (env, dir); + OUTPUT: + RETVAL + +int set_tmp_dir (DB_ENV *env, const char *dir) + CODE: + RETVAL = env->set_tmp_dir (env, dir); + OUTPUT: + RETVAL + +int set_lg_dir (DB_ENV *env, const char *dir) + CODE: + RETVAL = env->set_lg_dir (env, dir); + OUTPUT: + RETVAL + +int set_shm_key (DB_ENV *env, long shm_key) + CODE: + RETVAL = env->set_shm_key (env, shm_key); + OUTPUT: + RETVAL + int set_cachesize (DB_ENV *env, U32 gbytes, U32 bytes, int ncache = 0) + CODE: + RETVAL = env->set_cachesize (env, gbytes, bytes, ncache); + OUTPUT: + RETVAL int set_flags (DB_ENV *env, U32 flags, int onoff) + CODE: + RETVAL = env->set_flags (env, flags, onoff); + OUTPUT: + RETVAL + +int set_encrypt (DB_ENV *env, const char *password, U32 flags = 0) + CODE: + RETVAL = env->set_encrypt (env, password, flags); + OUTPUT: + RETVAL + +int set_timeout (DB_ENV *env, NV timeout, U32 flags) + CODE: + RETVAL = env->set_timeout (env, timeout * 1000000, flags); + OUTPUT: + RETVAL + +int set_mp_max_openfd (DB_ENV *env, int maxopenfd); + CODE: + RETVAL = env->set_mp_max_openfd (env, maxopenfd); + OUTPUT: + RETVAL + +int set_mp_max_write (DB_ENV *env, int maxwrite, int maxwrite_sleep); + CODE: + RETVAL = env->set_mp_max_write (env, maxwrite, maxwrite_sleep); + OUTPUT: + RETVAL + +int set_mp_mmapsize (DB_ENV *env, int mmapsize_mb) + CODE: + RETVAL = env->set_mp_mmapsize (env, ((size_t)mmapsize_mb) << 20); + OUTPUT: + RETVAL + +int set_lk_detect (DB_ENV *env, U32 detect = DB_LOCK_DEFAULT) + CODE: + RETVAL = env->set_lk_detect (env, detect); + OUTPUT: + RETVAL + +int set_lk_max_lockers (DB_ENV *env, U32 max) + CODE: + RETVAL = env->set_lk_max_lockers (env, max); + OUTPUT: + RETVAL + +int set_lk_max_locks (DB_ENV *env, U32 max) + CODE: + RETVAL = env->set_lk_max_locks (env, max); + OUTPUT: + RETVAL + +int set_lk_max_objects (DB_ENV *env, U32 max) + CODE: + RETVAL = env->set_lk_max_objects (env, max); + OUTPUT: + RETVAL + +int set_lg_bsize (DB_ENV *env, U32 max) + CODE: + RETVAL = env->set_lg_bsize (env, max); + OUTPUT: + RETVAL + +int set_lg_max (DB_ENV *env, U32 max) + CODE: + RETVAL = env->set_lg_max (env, max); + OUTPUT: + RETVAL -int set_encrypt (DB_ENV *env, const char *password, U32 flags) +DB_TXN * +txn_begin (DB_ENV *env, DB_TXN_ornull *parent = 0, U32 flags = 0) + CODE: + errno = env->txn_begin (env, parent, &RETVAL, flags); + if (errno) + croak ("DB_ENV->txn_begin: %s", db_strerror (errno)); + OUTPUT: + RETVAL MODULE = BDB PACKAGE = BDB::Db +void +DESTROY (DB_ornull *db) + CODE: + if (db) + { + SV *env = (SV *)db->app_private; + db->close (db, 0); + SvREFCNT_dec (env); + } + int set_cachesize (DB *db, U32 gbytes, U32 bytes, int ncache = 0) + CODE: + RETVAL = db->set_cachesize (db, gbytes, bytes, ncache); + OUTPUT: + RETVAL -int set_flags (DB *env, U32 flags, int onoff) +int set_flags (DB *db, U32 flags); + CODE: + RETVAL = db->set_flags (db, flags); + OUTPUT: + RETVAL int set_encrypt (DB *db, const char *password, U32 flags) + CODE: + RETVAL = db->set_encrypt (db, password, flags); + OUTPUT: + RETVAL int set_lorder (DB *db, int lorder) - + CODE: + RETVAL = db->set_lorder (db, lorder); + OUTPUT: + RETVAL int set_bt_minkey (DB *db, U32 minkey) + CODE: + RETVAL = db->set_bt_minkey (db, minkey); + OUTPUT: + RETVAL int set_re_delim(DB *db, int delim); + CODE: + RETVAL = db->set_re_delim (db, delim); + OUTPUT: + RETVAL int set_re_pad (DB *db, int re_pad) + CODE: + RETVAL = db->set_re_pad (db, re_pad); + OUTPUT: + RETVAL int set_re_source (DB *db, char *source) + CODE: + RETVAL = db->set_re_source (db, source); + OUTPUT: + RETVAL int set_re_len (DB *db, U32 re_len) + CODE: + RETVAL = db->set_re_len (db, re_len); + OUTPUT: + RETVAL int set_h_ffactor (DB *db, U32 h_ffactor) + CODE: + RETVAL = db->set_h_ffactor (db, h_ffactor); + OUTPUT: + RETVAL int set_h_nelem (DB *db, U32 h_nelem) + CODE: + RETVAL = db->set_h_nelem (db, h_nelem); + OUTPUT: + RETVAL int set_q_extentsize (DB *db, U32 extentsize) + CODE: + RETVAL = db->set_q_extentsize (db, extentsize); + OUTPUT: + RETVAL + +DBC * +cursor (DB *db, DB_TXN_ornull *txn = 0, U32 flags = 0) + CODE: + errno = db->cursor (db, txn, &RETVAL, flags); + if (errno) + croak ("DB->cursor: %s", db_strerror (errno)); + OUTPUT: + RETVAL +DB_SEQUENCE * +sequence (DB *db, U32 flags = 0) + CODE: +{ + errno = db_sequence_create (&RETVAL, db, flags); + if (errno) + croak ("db_sequence_create: %s", db_strerror (errno)); +} + OUTPUT: + RETVAL + + +MODULE = BDB PACKAGE = BDB::Txn + +void +DESTROY (DB_TXN_ornull *txn) + CODE: + if (txn) + txn->abort (txn); + +int set_timeout (DB_TXN *txn, NV timeout, U32 flags) + CODE: + RETVAL = txn->set_timeout (txn, timeout * 1000000, flags); + OUTPUT: + RETVAL + + +MODULE = BDB PACKAGE = BDB::Cursor + +void +DESTROY (DBC_ornull *dbc) + CODE: + if (dbc) + dbc->c_close (dbc); + +MODULE = BDB PACKAGE = BDB::Sequence + +void +DESTROY (DB_SEQUENCE_ornull *seq) + CODE: + if (seq) + seq->close (seq, 0); + +int initial_value (DB_SEQUENCE *seq, db_seq_t value) + CODE: + RETVAL = seq->initial_value (seq, value); + OUTPUT: + RETVAL + +int set_cachesize (DB_SEQUENCE *seq, U32 size) + CODE: + RETVAL = seq->set_cachesize (seq, size); + OUTPUT: + RETVAL + +int set_flags (DB_SEQUENCE *seq, U32 flags) + CODE: + RETVAL = seq->set_flags (seq, flags); + OUTPUT: + RETVAL + +int set_range (DB_SEQUENCE *seq, db_seq_t min, db_seq_t max) + CODE: + RETVAL = seq->set_range (seq, min, max); + OUTPUT: + RETVAL