--- BDB/BDB.xs 2007/12/04 10:13:50 1.20 +++ BDB/BDB.xs 2008/07/07 22:11:04 1.39 @@ -42,8 +42,14 @@ typedef DB DB_ornull; typedef DB_SEQUENCE DB_SEQUENCE_ornull; +typedef DB_ENV DB_ENV_ornuked; +typedef DB_TXN DB_TXN_ornuked; +typedef DBC DBC_ornuked; +typedef DB DB_ornuked; +typedef DB_SEQUENCE DB_SEQUENCE_ornuked; + typedef SV SV8; /* byte-sv, used for argument-checking */ -typedef char *octetstring; +typedef char *bdb_filename; static SV *prepare_cb; @@ -57,6 +63,36 @@ # define c_put put #endif +static char * +get_bdb_filename (SV *sv) +{ + if (!SvOK (sv)) + return 0; + +#if _WIN32 + /* win32 madness + win32 perl absolutely brokenness make for horrible hacks */ + { + STRLEN len; + char *src = SvPVbyte (sv, len); + SV *t1 = sv_newmortal (); + SV *t2 = sv_newmortal (); + + sv_upgrade (t1, SVt_PV); SvPOK_only (t1); SvGROW (t1, len * 16 + 1); + sv_upgrade (t2, SVt_PV); SvPOK_only (t2); SvGROW (t2, len * 16 + 1); + + len = MultiByteToWideChar (CP_ACP, 0, src, len, (WCHAR *)SvPVX (t1), SvLEN (t1) / sizeof (WCHAR)); + len = WideCharToMultiByte (CP_UTF8, 0, (WCHAR *)SvPVX (t1), len, SvPVX (t2), SvLEN (t2), 0, 0); + SvPOK_only (t2); + SvPVX (t2)[len] = 0; + SvCUR_set (t2, len); + + return SvPVX (t2); + } +#else + return SvPVbyte_nolen (sv); +#endif +} + static void debug_errcall (const DB_ENV *dbenv, const char *errpfx, const char *msg) { @@ -103,17 +139,17 @@ enum { REQ_QUIT, REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT, - REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, - REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, + REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, REQ_ENV_DBREMOVE, REQ_ENV_DBRENAME, + REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_UPGRADE, REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, }; -typedef struct aio_cb +typedef struct bdb_cb { - struct aio_cb *volatile next; + struct bdb_cb *volatile next; SV *callback; int type, pri, result; @@ -125,16 +161,16 @@ UV uv1; int int1, int2; U32 uint1, uint2; - char *buf1, *buf2; + char *buf1, *buf2, *buf3; SV *sv1, *sv2, *sv3; DBT dbt1, dbt2, dbt3; DB_KEY_RANGE key_range; DB_SEQUENCE *seq; db_seq_t seq_t; -} aio_cb; +} bdb_cb; -typedef aio_cb *aio_req; +typedef bdb_cb *bdb_req; enum { PRI_MIN = -4, @@ -147,6 +183,8 @@ #define AIO_TICKS ((1000000 + 1023) >> 10) +static SV *on_next_submit; + static unsigned int max_poll_time = 0; static unsigned int max_poll_reqs = 0; @@ -171,7 +209,7 @@ thread_t tid; /* locked by reslock, reqlock or wrklock */ - aio_req req; /* currently processed request */ + bdb_req req; /* currently processed request */ void *dbuf; DIR *dirp; } worker; @@ -201,7 +239,7 @@ #if WORDACCESS_UNSAFE -static unsigned int get_nready () +static unsigned int get_nready (void) { unsigned int retval; @@ -212,7 +250,7 @@ return retval; } -static unsigned int get_npending () +static unsigned int get_npending (void) { unsigned int retval; @@ -223,7 +261,7 @@ return retval; } -static unsigned int get_nthreads () +static unsigned int get_nthreads (void) { unsigned int retval; @@ -248,14 +286,14 @@ * per shift, the most expensive operation. */ typedef struct { - aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ + bdb_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ int size; } reqq; static reqq req_queue; static reqq res_queue; -int reqq_push (reqq *q, aio_req req) +int reqq_push (reqq *q, bdb_req req) { int pri = req->pri; req->next = 0; @@ -271,7 +309,7 @@ return q->size++; } -aio_req reqq_shift (reqq *q) +bdb_req reqq_shift (reqq *q) { int pri; @@ -282,7 +320,7 @@ for (pri = NUM_PRI; pri--; ) { - aio_req req = q->qs[pri]; + bdb_req req = q->qs[pri]; if (req) { @@ -296,11 +334,11 @@ abort (); } -static int poll_cb (); -static void req_free (aio_req req); -static void req_cancel (aio_req req); +static int poll_cb (void); +static void req_free (bdb_req req); +static void req_cancel (bdb_req req); -static int req_invoke (aio_req req) +static int req_invoke (bdb_req req) { dSP; @@ -328,6 +366,12 @@ dbt_to_sv (req->sv3, &req->dbt3); break; + case REQ_DB_PUT: + case REQ_C_PUT: + dbt_to_sv (0, &req->dbt1); + dbt_to_sv (0, &req->dbt2); + break; + case REQ_DB_KEY_RANGE: { AV *av = newAV (); @@ -346,9 +390,9 @@ SvREADONLY_off (req->sv1); if (sizeof (IV) > 4) - sv_setiv_mg (req->sv1, req->seq_t); + sv_setiv_mg (req->sv1, (IV)req->seq_t); else - sv_setnv_mg (req->sv1, req->seq_t); + sv_setnv_mg (req->sv1, (NV)req->seq_t); SvREFCNT_dec (req->sv1); break; @@ -367,10 +411,11 @@ return !SvTRUE (ERRSV); } -static void req_free (aio_req req) +static void req_free (bdb_req req) { free (req->buf1); free (req->buf2); + free (req->buf3); Safefree (req); } @@ -381,8 +426,11 @@ #endif static void -create_respipe () +create_respipe (void) { +#ifdef _WIN32 + int arg; /* argg */ +#endif int old_readfd = respipe [0]; if (respipe [1] >= 0) @@ -405,7 +453,7 @@ } #ifdef _WIN32 - int arg = 1; + arg = 1; if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) #else @@ -442,7 +490,7 @@ X_UNLOCK (wrklock); } -static void maybe_start_thread () +static void maybe_start_thread (void) { if (get_nthreads () >= wanted) return; @@ -454,10 +502,22 @@ start_thread (); } -static void req_send (aio_req req) +static void req_send (bdb_req req) { SV *wait_callback = 0; + if (on_next_submit) + { + dSP; + SV *cb = sv_2mortal (on_next_submit); + + on_next_submit = 0; + + PUSHMARK (SP); + PUTBACK; + call_sv (cb, G_DISCARD | G_EVAL); + } + // synthesize callback if none given if (!SvOK (req->callback)) { @@ -472,7 +532,7 @@ if (count != 2) croak ("prepare callback must return exactly two values\n"); - wait_callback = SvREFCNT_inc (POPs); + wait_callback = POPs; SvREFCNT_dec (req->callback); req->callback = SvREFCNT_inc (POPs); } @@ -493,15 +553,14 @@ PUSHMARK (SP); PUTBACK; call_sv (wait_callback, G_DISCARD); - SvREFCNT_dec (wait_callback); } } static void end_thread (void) { - aio_req req; + bdb_req req; - Newz (0, req, 1, aio_cb); + Newz (0, req, 1, bdb_cb); req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; @@ -538,7 +597,7 @@ end_thread (); } -static void poll_wait () +static void poll_wait (void) { fd_set rfd; @@ -561,14 +620,14 @@ } } -static int poll_cb () +static int poll_cb (void) { dSP; int count = 0; int maxreqs = max_poll_reqs; int do_croak = 0; struct timeval tv_start, tv_now; - aio_req req; + bdb_req req; if (max_poll_time) gettimeofday (&tv_start, 0); @@ -639,7 +698,7 @@ X_THREAD_PROC (bdb_proc) { - aio_req req; + bdb_req req; struct timespec ts; worker *self = (worker *)thr_arg; @@ -716,6 +775,14 @@ req->result = req->env->memp_trickle (req->env, req->int1, &req->int2); break; + case REQ_ENV_DBREMOVE: + req->result = req->env->dbremove (req->env, req->txn, req->buf1, req->buf2, req->uint1); + break; + + case REQ_ENV_DBRENAME: + req->result = req->env->dbrename (req->env, req->txn, req->buf1, req->buf2, req->buf3, req->uint1); + break; + case REQ_DB_OPEN: req->result = req->db->open (req->db, req->txn, req->buf1, req->buf2, req->int1, req->uint1, req->int2); break; @@ -732,6 +799,10 @@ req->result = req->db->sync (req->db, req->uint1); break; + case REQ_DB_UPGRADE: + req->result = req->db->upgrade (req->db, req->buf1, req->uint1); + break; + case REQ_DB_PUT: req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1); break; @@ -863,7 +934,7 @@ static void atfork_child (void) { - aio_req prv; + bdb_req prv; while (prv = reqq_shift (&req_queue)) req_free (prv); @@ -894,16 +965,16 @@ } #define dREQ(reqtype) \ - aio_req req; \ + bdb_req req; \ int req_pri = next_pri; \ next_pri = DEFAULT_PRI + PRI_BIAS; \ \ if (SvOK (callback) && !SvROK (callback)) \ croak ("callback must be undef or of reference type"); \ \ - Newz (0, req, 1, aio_cb); \ + Newz (0, req, 1, bdb_cb); \ if (!req) \ - croak ("out of memory during aio_req allocation"); \ + croak ("out of memory during bdb_req allocation"); \ \ req->callback = newSVsv (callback); \ req->type = (reqtype); \ @@ -912,10 +983,10 @@ #define REQ_SEND \ req_send (req) -#define SvPTR(var, arg, type, class, nullok) \ +#define SvPTR(var, arg, type, class, nullok) \ if (!SvOK (arg)) \ { \ - if (!nullok) \ + if (nullok != 1) \ croak (# var " must be a " # class " object, not undef"); \ \ (var) = 0; \ @@ -924,7 +995,7 @@ { \ IV tmp = SvIV ((SV*) SvRV (arg)); \ (var) = INT2PTR (type, tmp); \ - if (!var) \ + if (!var && nullok != 2) \ croak (# var " is not a valid " # class " object anymore"); \ } \ else \ @@ -938,6 +1009,45 @@ sv_setiv (SvRV (sv), 0); } +static int +errno_get (pTHX_ SV *sv, MAGIC *mg) +{ + if (*mg->mg_ptr == '!') // should always be the case + if (-30999 <= errno && errno <= -30800) + { + sv_setnv (sv, (NV)errno); + sv_setpv (sv, db_strerror (errno)); + SvNOK_on (sv); /* what a wonderful hack! */ + // ^^^ copied from perl sources + return 0; + } + + return PL_vtbl_sv.svt_get (aTHX_ sv, mg); +} + +static MGVTBL vtbl_errno; + +// this wonderful hack :( patches perl's $! variable to support our errno values +static void +patch_errno (void) +{ + SV *sv; + MAGIC *mg; + + if (!(sv = get_sv ("!", 1))) + return; + + if (!(mg = mg_find (sv, PERL_MAGIC_sv))) + return; + + if (mg->mg_virtual != &PL_vtbl_sv) + return; + + vtbl_errno = PL_vtbl_sv; + vtbl_errno.svt_get = errno_get; + mg->mg_virtual = &vtbl_errno; +} + MODULE = BDB PACKAGE = BDB PROTOTYPES: ENABLE @@ -972,11 +1082,7 @@ const_iv (AUTO_COMMIT) const_iv (CDB_ALLDB) const_iv (DIRECT_DB) - const_iv (DIRECT_LOG) const_iv (DSYNC_DB) - const_iv (DSYNC_LOG) - const_iv (LOG_AUTOREMOVE) - const_iv (LOG_INMEMORY) const_iv (NOLOCKING) const_iv (NOMMAP) const_iv (NOPANIC) @@ -1017,6 +1123,7 @@ //const_iv (MULTIPLE) const_iv (SNAPSHOT) const_iv (JOIN_ITEM) + const_iv (JOIN_NOSORT) const_iv (RMW) const_iv (NOTFOUND) @@ -1042,7 +1149,6 @@ const_iv (SET_LOCK_TIMEOUT) const_iv (SET_TXN_TIMEOUT) - const_iv (JOIN_ITEM) const_iv (FIRST) const_iv (NEXT) const_iv (NEXT_DUP) @@ -1086,7 +1192,6 @@ const_iv (NOSERVER_HOME) const_iv (NOSERVER_ID) const_iv (NOTFOUND) - const_iv (OLD_VERSION) const_iv (PAGE_NOTFOUND) const_iv (REP_DUPMASTER) const_iv (REP_HANDLE_DEAD) @@ -1119,33 +1224,37 @@ #endif #if DB_VERSION_MINOR >= 6 const_iv (PREV_DUP) -# if 0 const_iv (PRIORITY_UNCHANGED) const_iv (PRIORITY_VERY_LOW) const_iv (PRIORITY_LOW) const_iv (PRIORITY_DEFAULT) const_iv (PRIORITY_HIGH) const_iv (PRIORITY_VERY_HIGH) -# endif +#endif +#if DB_VERSION_MINOR >= 7 + const_iv (LOG_DIRECT) + const_iv (LOG_DSYNC) + const_iv (LOG_AUTO_REMOVE) + const_iv (LOG_IN_MEMORY) + const_iv (LOG_ZERO) +#else + const_iv (DIRECT_LOG) + const_iv (DSYNC_LOG) + const_iv (LOG_AUTOREMOVE) + const_iv (LOG_INMEMORY) #endif }; for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); - newCONSTSUB (stash, "DB_VERSION", newSVnv (DB_VERSION_MAJOR + DB_VERSION_MINOR * .1)); - newCONSTSUB (stash, "DB_VERSION_STRING", newSVpv (DB_VERSION_STRING, 0)); + newCONSTSUB (stash, "VERSION", newSVnv (DB_VERSION_MAJOR + DB_VERSION_MINOR * .1)); + newCONSTSUB (stash, "VERSION_STRING", newSVpv (DB_VERSION_STRING, 0)); create_respipe (); X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); -#ifdef _WIN32 - X_MUTEX_CHECK (wrklock); - X_MUTEX_CHECK (reslock); - X_MUTEX_CHECK (reqlock); - - X_COND_CHECK (reqwait); -#endif + patch_errno (); } void @@ -1295,6 +1404,11 @@ OUTPUT: RETVAL +void _on_next_submit (SV *cb) + CODE: + SvREFCNT_dec (on_next_submit); + on_next_submit = SvOK (cb) ? newSVsv (cb) : 0; + DB_ENV * db_env_create (U32 env_flags = 0) CODE: @@ -1313,13 +1427,11 @@ RETVAL void -db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef) +db_env_open (DB_ENV *env, bdb_filename db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef) CODE: { dREQ (REQ_ENV_OPEN); - env->set_thread_count (env, wanted + 2); - req->env = env; req->uint1 = open_flags | DB_THREAD; req->int1 = mode; @@ -1380,6 +1492,30 @@ REQ_SEND; } +void +db_env_dbremove (DB_ENV *env, DB_TXN_ornull *txnid, bdb_filename file, bdb_filename database, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_ENV_DBREMOVE); + req->env = env; + req->buf1 = strdup_ornull (file); + req->buf2 = strdup_ornull (database); + req->uint1 = flags; + REQ_SEND; +} + +void +db_env_dbrename (DB_ENV *env, DB_TXN_ornull *txnid, bdb_filename file, bdb_filename database, bdb_filename newname, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_ENV_DBRENAME); + req->env = env; + req->buf1 = strdup_ornull (file); + req->buf2 = strdup_ornull (database); + req->buf3 = strdup_ornull (newname); + req->uint1 = flags; + REQ_SEND; +} DB * db_create (DB_ENV *env = 0, U32 flags = 0) @@ -1396,7 +1532,7 @@ RETVAL void -db_open (DB *db, DB_TXN_ornull *txnid, octetstring file, octetstring database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef) +db_open (DB *db, DB_TXN_ornull *txnid, bdb_filename file, bdb_filename database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef) CODE: { dREQ (REQ_DB_OPEN); @@ -1446,6 +1582,17 @@ } void +db_upgrade (DB *db, bdb_filename file, U32 flags = 0, SV *callback = &PL_sv_undef) + CODE: +{ + dREQ (REQ_DB_SYNC); + req->db = db; + req->buf1 = strdup (file); + req->uint1 = flags; + REQ_SEND; +} + +void db_key_range (DB *db, DB_TXN_ornull *txn, SV *key, SV *key_range, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: { @@ -1474,6 +1621,8 @@ void db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: + if (SvREADONLY (data)) + croak ("can't modify read-only data scalar in db_get"); { dREQ (REQ_DB_GET); req->db = db; @@ -1488,6 +1637,8 @@ void db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) CODE: + if (SvREADONLY (data)) + croak ("can't modify read-only data scalar in db_pget"); { dREQ (REQ_DB_PGET); req->db = db; @@ -1691,7 +1842,7 @@ MODULE = BDB PACKAGE = BDB::Env void -DESTROY (DB_ENV_ornull *env) +DESTROY (DB_ENV_ornuked *env) CODE: if (env) env->close (env, 0); @@ -1726,12 +1877,29 @@ OUTPUT: RETVAL -int set_flags (DB_ENV *env, U32 flags, int onoff) +int set_flags (DB_ENV *env, U32 flags, int onoff = 1) CODE: RETVAL = env->set_flags (env, flags, onoff); OUTPUT: RETVAL +#if DB_VERSION_MINOR >= 7 + +int set_intermediate_dir_mode (DB_ENV *env, const char *mode) + CODE: + RETVAL = env->set_intermediate_dir_mode (env, mode); + OUTPUT: + RETVAL + +int log_set_config (DB_ENV *env, U32 flags, int onoff = 1) + CODE: + RETVAL = env->log_set_config (env, flags, onoff); + OUTPUT: + RETVAL + +#endif + + void set_errfile (DB_ENV *env, FILE *errfile = 0) CODE: env->set_errfile (env, errfile); @@ -1740,7 +1908,7 @@ CODE: env->set_msgfile (env, msgfile); -int set_verbose (DB_ENV *env, U32 which, int onoff = 1) +int set_verbose (DB_ENV *env, U32 which = -1, int onoff = 1) CODE: RETVAL = env->set_verbose (env, which, onoff); OUTPUT: @@ -1812,6 +1980,30 @@ OUTPUT: RETVAL +int mutex_set_max (DB_ENV *env, U32 max) + CODE: + RETVAL = env->mutex_set_max (env, max); + OUTPUT: + RETVAL + +int mutex_set_increment (DB_ENV *env, U32 increment) + CODE: + RETVAL = env->mutex_set_increment (env, increment); + OUTPUT: + RETVAL + +int mutex_set_tas_spins (DB_ENV *env, U32 tas_spins) + CODE: + RETVAL = env->mutex_set_tas_spins (env, tas_spins); + OUTPUT: + RETVAL + +int mutex_set_align (DB_ENV *env, U32 align) + CODE: + RETVAL = env->mutex_set_align (env, align); + OUTPUT: + RETVAL + DB_TXN * txn_begin (DB_ENV *env, DB_TXN_ornull *parent = 0, U32 flags = 0) CODE: @@ -1824,7 +2016,7 @@ MODULE = BDB PACKAGE = BDB::Db void -DESTROY (DB_ornull *db) +DESTROY (DB_ornuked *db) CODE: if (db) { @@ -1929,7 +2121,7 @@ MODULE = BDB PACKAGE = BDB::Txn void -DESTROY (DB_TXN_ornull *txn) +DESTROY (DB_TXN_ornuked *txn) CODE: if (txn) txn->abort (txn); @@ -1950,15 +2142,23 @@ MODULE = BDB PACKAGE = BDB::Cursor void -DESTROY (DBC_ornull *dbc) +DESTROY (DBC_ornuked *dbc) CODE: if (dbc) dbc->c_close (dbc); +#if DB_VERSION_MINOR >= 6 + +int set_priority (DBC *dbc, int priority) + CODE: + dbc->set_priority (dbc, priority); + +#endif + MODULE = BDB PACKAGE = BDB::Sequence void -DESTROY (DB_SEQUENCE_ornull *seq) +DESTROY (DB_SEQUENCE_ornuked *seq) CODE: if (seq) seq->close (seq, 0); @@ -1987,3 +2187,4 @@ OUTPUT: RETVAL +