--- BDB/BDB.xs 2007/08/13 11:16:22 1.15 +++ BDB/BDB.xs 2007/12/22 07:33:48 1.28 @@ -47,6 +47,16 @@ static SV *prepare_cb; +#if DB_VERSION_MINOR >= 6 +# define c_close close +# define c_count count +# define c_del del +# define c_dup dup +# define c_get get +# define c_pget pget +# define c_put put +#endif + static void debug_errcall (const DB_ENV *dbenv, const char *errpfx, const char *msg) { @@ -94,16 +104,16 @@ REQ_QUIT, REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT, REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, - REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, + REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_UPGRADE, REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, - REQ_TXN_COMMIT, REQ_TXN_ABORT, + REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, }; -typedef struct aio_cb +typedef struct bdb_cb { - struct aio_cb *volatile next; + struct bdb_cb *volatile next; SV *callback; int type, pri, result; @@ -122,9 +132,9 @@ DB_KEY_RANGE key_range; DB_SEQUENCE *seq; db_seq_t seq_t; -} aio_cb; +} bdb_cb; -typedef aio_cb *aio_req; +typedef bdb_cb *bdb_req; enum { PRI_MIN = -4, @@ -161,7 +171,7 @@ thread_t tid; /* locked by reslock, reqlock or wrklock */ - aio_req req; /* currently processed request */ + bdb_req req; /* currently processed request */ void *dbuf; DIR *dirp; } worker; @@ -183,7 +193,7 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2], respipe_osf [2]; +static int respipe_osf [2], respipe [2] = { -1, -1 }; static mutex_t reslock = X_MUTEX_INIT; static mutex_t reqlock = X_MUTEX_INIT; @@ -238,14 +248,14 @@ * per shift, the most expensive operation. */ typedef struct { - aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ + bdb_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ int size; } reqq; static reqq req_queue; static reqq res_queue; -int reqq_push (reqq *q, aio_req req) +int reqq_push (reqq *q, bdb_req req) { int pri = req->pri; req->next = 0; @@ -261,7 +271,7 @@ return q->size++; } -aio_req reqq_shift (reqq *q) +bdb_req reqq_shift (reqq *q) { int pri; @@ -272,7 +282,7 @@ for (pri = NUM_PRI; pri--; ) { - aio_req req = q->qs[pri]; + bdb_req req = q->qs[pri]; if (req) { @@ -287,10 +297,10 @@ } static int poll_cb (); -static void req_free (aio_req req); -static void req_cancel (aio_req req); +static void req_free (bdb_req req); +static void req_cancel (bdb_req req); -static int req_invoke (aio_req req) +static int req_invoke (bdb_req req) { dSP; @@ -318,6 +328,12 @@ dbt_to_sv (req->sv3, &req->dbt3); break; + case REQ_DB_PUT: + case REQ_C_PUT: + dbt_to_sv (0, &req->dbt1); + dbt_to_sv (0, &req->dbt2); + break; + case REQ_DB_KEY_RANGE: { AV *av = newAV (); @@ -336,9 +352,9 @@ SvREADONLY_off (req->sv1); if (sizeof (IV) > 4) - sv_setiv_mg (req->sv1, req->seq_t); + sv_setiv_mg (req->sv1, (IV)req->seq_t); else - sv_setnv_mg (req->sv1, req->seq_t); + sv_setnv_mg (req->sv1, (NV)req->seq_t); SvREFCNT_dec (req->sv1); break; @@ -357,7 +373,7 @@ return !SvTRUE (ERRSV); } -static void req_free (aio_req req) +static void req_free (bdb_req req) { free (req->buf1); free (req->buf2); @@ -371,20 +387,42 @@ #endif static void -create_pipe (int fd[2]) +create_respipe () { #ifdef _WIN32 - int arg = 1; - if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd) - || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) - || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) + int arg; /* argg */ +#endif + int old_readfd = respipe [0]; + + if (respipe [1] >= 0) + respipe_close (TO_SOCKET (respipe [1])); + +#ifdef _WIN32 + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe)) #else - if (pipe (fd) - || fcntl (fd [0], F_SETFL, O_NONBLOCK) - || fcntl (fd [1], F_SETFL, O_NONBLOCK)) + if (pipe (respipe)) #endif croak ("unable to initialize result pipe"); + if (old_readfd >= 0) + { + if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0) + croak ("unable to initialize result pipe(2)"); + + respipe_close (respipe [0]); + respipe [0] = old_readfd; + } + +#ifdef _WIN32 + arg = 1; + if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) +#else + if (fcntl (respipe [0], F_SETFL, O_NONBLOCK) + || fcntl (respipe [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe(3)"); + respipe_osf [0] = TO_SOCKET (respipe [0]); respipe_osf [1] = TO_SOCKET (respipe [1]); } @@ -425,7 +463,7 @@ start_thread (); } -static void req_send (aio_req req) +static void req_send (bdb_req req) { SV *wait_callback = 0; @@ -443,7 +481,7 @@ if (count != 2) croak ("prepare callback must return exactly two values\n"); - wait_callback = SvREFCNT_inc (POPs); + wait_callback = POPs; SvREFCNT_dec (req->callback); req->callback = SvREFCNT_inc (POPs); } @@ -464,15 +502,14 @@ PUSHMARK (SP); PUTBACK; call_sv (wait_callback, G_DISCARD); - SvREFCNT_dec (wait_callback); } } static void end_thread (void) { - aio_req req; + bdb_req req; - Newz (0, req, 1, aio_cb); + Newz (0, req, 1, bdb_cb); req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; @@ -539,7 +576,7 @@ int maxreqs = max_poll_reqs; int do_croak = 0; struct timeval tv_start, tv_now; - aio_req req; + bdb_req req; if (max_poll_time) gettimeofday (&tv_start, 0); @@ -610,7 +647,7 @@ X_THREAD_PROC (bdb_proc) { - aio_req req; + bdb_req req; struct timespec ts; worker *self = (worker *)thr_arg; @@ -660,6 +697,7 @@ switch (req->type) { case REQ_QUIT: + req->result = ENOSYS; goto quit; case REQ_ENV_OPEN: @@ -702,6 +740,10 @@ req->result = req->db->sync (req->db, req->uint1); break; + case REQ_DB_UPGRADE: + req->result = req->db->upgrade (req->db, req->buf1, req->uint1); + break; + case REQ_DB_PUT: req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1); break; @@ -730,6 +772,17 @@ req->result = req->txn->abort (req->txn); break; + case REQ_TXN_FINISH: + if (req->txn->flags & TXN_DEADLOCK) + { + req->result = req->txn->abort (req->txn); + if (!req->result) + req->result = DB_LOCK_DEADLOCK; + } + else + req->result = req->txn->commit (req->txn, req->uint1); + break; + case REQ_C_CLOSE: req->result = req->dbc->c_close (req->dbc); break; @@ -779,6 +832,9 @@ break; } + if (req->txn && (req->result > 0 || req->result == DB_LOCK_NOTGRANTED)) + req->txn->flags |= TXN_DEADLOCK; + X_LOCK (reslock); ++npending; @@ -819,7 +875,7 @@ static void atfork_child (void) { - aio_req prv; + bdb_req prv; while (prv = reqq_shift (&req_queue)) req_free (prv); @@ -844,25 +900,22 @@ nready = 0; npending = 0; - respipe_close (respipe [0]); - respipe_close (respipe [1]); - - create_pipe (respipe); + create_respipe (); atfork_parent (); } #define dREQ(reqtype) \ - aio_req req; \ + bdb_req req; \ int req_pri = next_pri; \ next_pri = DEFAULT_PRI + PRI_BIAS; \ \ if (SvOK (callback) && !SvROK (callback)) \ croak ("callback must be undef or of reference type"); \ \ - Newz (0, req, 1, aio_cb); \ + Newz (0, req, 1, bdb_cb); \ if (!req) \ - croak ("out of memory during aio_req allocation"); \ + croak ("out of memory during bdb_req allocation"); \ \ req->callback = newSVsv (callback); \ req->type = (reqtype); \ @@ -921,6 +974,7 @@ const_iv (INIT_TXN) const_iv (RECOVER_FATAL) const_iv (CREATE) + const_iv (RDONLY) const_iv (USE_ENVIRON) const_iv (USE_ENVIRON_ROOT) const_iv (LOCKDOWN) @@ -943,6 +997,7 @@ const_iv (REGION_INIT) const_iv (TIME_NOTGRANTED) const_iv (TXN_NOSYNC) + const_iv (TXN_NOT_DURABLE) const_iv (TXN_WRITE_NOSYNC) const_iv (WRITECURSOR) const_iv (YIELDCPU) @@ -960,7 +1015,6 @@ const_iv (NOSYNC) const_iv (CHKSUM) const_iv (ENCRYPT) - const_iv (TXN_NOT_DURABLE) const_iv (DUP) const_iv (DUPSORT) const_iv (RECNUM) @@ -975,6 +1029,7 @@ //const_iv (MULTIPLE) const_iv (SNAPSHOT) const_iv (JOIN_ITEM) + const_iv (JOIN_NOSORT) const_iv (RMW) const_iv (NOTFOUND) @@ -1000,7 +1055,6 @@ const_iv (SET_LOCK_TIMEOUT) const_iv (SET_TXN_TIMEOUT) - const_iv (JOIN_ITEM) const_iv (FIRST) const_iv (NEXT) const_iv (NEXT_DUP) @@ -1044,7 +1098,6 @@ const_iv (NOSERVER_HOME) const_iv (NOSERVER_ID) const_iv (NOTFOUND) - const_iv (OLD_VERSION) const_iv (PAGE_NOTFOUND) const_iv (REP_DUPMASTER) const_iv (REP_HANDLE_DEAD) @@ -1076,17 +1129,23 @@ const_iv (TXN_SNAPSHOT) #endif #if DB_VERSION_MINOR >= 6 - const_iv (DB_PREV_DUP) + const_iv (PREV_DUP) + const_iv (PRIORITY_UNCHANGED) + const_iv (PRIORITY_VERY_LOW) + const_iv (PRIORITY_LOW) + const_iv (PRIORITY_DEFAULT) + const_iv (PRIORITY_HIGH) + const_iv (PRIORITY_VERY_HIGH) #endif }; for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); - newCONSTSUB (stash, "DB_VERSION", newSVnv (DB_VERSION_MAJOR + DB_VERSION_MINOR * .1)); - newCONSTSUB (stash, "DB_VERSION_STRING", newSVpv (DB_VERSION_STRING, 0)); + newCONSTSUB (stash, "VERSION", newSVnv (DB_VERSION_MAJOR + DB_VERSION_MINOR * .1)); + newCONSTSUB (stash, "VERSION_STRING", newSVpv (DB_VERSION_STRING, 0)); - create_pipe (respipe); + create_respipe (); X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); #ifdef _WIN32 @@ -1396,6 +1455,17 @@ } void +db_upgrade (DB *db, octetstring file, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_SYNC); + req->db = db; + req->buf1 = strdup (file); + req->uint1 = flags; + REQ_SEND; +} + +void db_key_range (DB *db, DB_TXN_ornull *txn, SV *key, SV *key_range, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: { @@ -1424,6 +1494,8 @@ void db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: + if (SvREADONLY (data)) + croak ("can't modify read-only data scalar in db_get"); { dREQ (REQ_DB_GET); req->db = db; @@ -1438,6 +1510,8 @@ void db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: + if (SvREADONLY (data)) + croak ("can't modify read-only data scalar in db_pget"); { dREQ (REQ_DB_PGET); req->db = db; @@ -1484,6 +1558,17 @@ } void +db_txn_finish (DB_TXN *txn, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_TXN_FINISH); + req->txn = txn; + req->uint1 = flags; + REQ_SEND; + ptr_nuke (ST (0)); +} + +void db_c_close (DBC *dbc, SV *callback = &PL_sv_undef) CODE: { @@ -1665,21 +1750,21 @@ OUTPUT: RETVAL -int set_flags (DB_ENV *env, U32 flags, int onoff) +int set_flags (DB_ENV *env, U32 flags, int onoff = 1) CODE: RETVAL = env->set_flags (env, flags, onoff); OUTPUT: RETVAL -void set_errfile (DB_ENV *env, FILE *errfile) +void set_errfile (DB_ENV *env, FILE *errfile = 0) CODE: env->set_errfile (env, errfile); -void set_msgfile (DB_ENV *env, FILE *msgfile) +void set_msgfile (DB_ENV *env, FILE *msgfile = 0) CODE: env->set_msgfile (env, msgfile); -int set_verbose (DB_ENV *env, U32 which, int onoff = 1) +int set_verbose (DB_ENV *env, U32 which = -1, int onoff = 1) CODE: RETVAL = env->set_verbose (env, which, onoff); OUTPUT: @@ -1691,7 +1776,7 @@ OUTPUT: RETVAL -int set_timeout (DB_ENV *env, NV timeout, U32 flags) +int set_timeout (DB_ENV *env, NV timeout, U32 flags = DB_SET_TXN_TIMEOUT) CODE: RETVAL = env->set_timeout (env, timeout * 1000000, flags); OUTPUT: @@ -1751,6 +1836,30 @@ OUTPUT: RETVAL +int mutex_set_max (DB_ENV *env, U32 max) + CODE: + RETVAL = env->mutex_set_max (env, max); + OUTPUT: + RETVAL + +int mutex_set_increment (DB_ENV *env, U32 increment) + CODE: + RETVAL = env->mutex_set_increment (env, increment); + OUTPUT: + RETVAL + +int mutex_set_tas_spins (DB_ENV *env, U32 tas_spins) + CODE: + RETVAL = env->mutex_set_tas_spins (env, tas_spins); + OUTPUT: + RETVAL + +int mutex_set_align (DB_ENV *env, U32 align) + CODE: + RETVAL = env->mutex_set_align (env, align); + OUTPUT: + RETVAL + DB_TXN * txn_begin (DB_ENV *env, DB_TXN_ornull *parent = 0, U32 flags = 0) CODE: @@ -1778,7 +1887,7 @@ OUTPUT: RETVAL -int set_flags (DB *db, U32 flags); +int set_flags (DB *db, U32 flags) CODE: RETVAL = db->set_flags (db, flags); OUTPUT: @@ -1802,7 +1911,7 @@ OUTPUT: RETVAL -int set_re_delim(DB *db, int delim); +int set_re_delim (DB *db, int delim) CODE: RETVAL = db->set_re_delim (db, delim); OUTPUT: @@ -1873,12 +1982,18 @@ if (txn) txn->abort (txn); -int set_timeout (DB_TXN *txn, NV timeout, U32 flags) +int set_timeout (DB_TXN *txn, NV timeout, U32 flags = DB_SET_TXN_TIMEOUT) CODE: RETVAL = txn->set_timeout (txn, timeout * 1000000, flags); OUTPUT: RETVAL +int failed (DB_TXN *txn) + CODE: + RETVAL = !!(txn->flags & TXN_DEADLOCK); + OUTPUT: + RETVAL + MODULE = BDB PACKAGE = BDB::Cursor @@ -1888,6 +2003,14 @@ if (dbc) dbc->c_close (dbc); +#if DB_VERSION_MINOR >= 6 + +int set_priority (DBC *dbc, int priority) + CODE: + dbc->set_priority (dbc, priority); + +#endif + MODULE = BDB PACKAGE = BDB::Sequence void