ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
Revision: 1.5
Committed: Mon Feb 5 23:46:15 2007 UTC (17 years, 3 months ago) by root
Branch: MAIN
Changes since 1.4: +184 -28 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /* solaris */
2     #define _POSIX_PTHREAD_SEMANTICS 1
3    
4     #if __linux && !defined(_GNU_SOURCE)
5     # define _GNU_SOURCE
6     #endif
7    
8     /* just in case */
9     #define _REENTRANT 1
10    
11     #include <errno.h>
12    
13     #include "EXTERN.h"
14     #include "perl.h"
15     #include "XSUB.h"
16    
17     #include <pthread.h>
18    
19     #include <stddef.h>
20     #include <stdlib.h>
21     #include <errno.h>
22     #include <sys/time.h>
23     #include <sys/types.h>
24     #include <limits.h>
25     #include <unistd.h>
26     #include <fcntl.h>
27 root 1.2
28     #include <db.h>
29 root 1.1
30     /* number of seconds after which idle threads exit */
31     #define IDLE_TIMEOUT 10
32    
33     /* wether word reads are potentially non-atomic.
34     * this is conservatice, likely most arches this runs
35     * on have atomic word read/writes.
36     */
37     #ifndef WORDACCESS_UNSAFE
38     # if __i386 || __x86_64
39     # define WORDACCESS_UNSAFE 0
40     # else
41     # define WORDACCESS_UNSAFE 1
42     # endif
43     #endif
44    
45 root 1.5 typedef DB_ENV DB_ENV_ornull;
46     typedef DB_TXN DB_TXN_ornull;
47     typedef DBC DBC_ornull;
48     typedef DB DB_ornull;
49    
50 root 1.1 typedef SV SV8; /* byte-sv, used for argument-checking */
51 root 1.3 typedef char *octetstring;
52 root 1.1
53 root 1.3 static SV *prepare_cb;
54    
55     static inline char *
56     strdup_ornull (const char *s)
57 root 1.2 {
58 root 1.3 return s ? strdup (s) : 0;
59     }
60    
61 root 1.5 inline void
62     sv_to_dbt (DBT *dbt, SV *sv)
63     {
64     STRLEN len;
65     char *data = SvPVbyte (sv, len);
66    
67     dbt->data = malloc (len);
68     memcpy (dbt->data, data, len);
69     dbt->size = len;
70     }
71    
72     inline void
73     dbt_to_sv (SV *sv, DBT *dbt)
74     {
75     SvREADONLY_off (sv);
76     sv_setpvn_mg (sv, dbt->data, dbt->size);
77    
78     free (dbt->data);
79     }
80    
81 root 1.3 enum {
82 root 1.1 REQ_QUIT,
83 root 1.2 REQ_ENV_OPEN, REQ_ENV_CLOSE,
84 root 1.5 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET,
85     REQ_TXN_COMMIT, REQ_TXN_ABORT,
86 root 1.1 };
87    
88     typedef struct aio_cb
89     {
90 root 1.2 struct aio_cb *volatile next;
91     SV *callback;
92 root 1.3 int type, pri, result;
93 root 1.2
94     DB_ENV *env;
95     DB *db;
96     DB_TXN *txn;
97     DBC *cursor;
98     int int1, int2;
99     U32 uint1, uint2;
100     char *buf1, *buf2;
101 root 1.5 SV *sv1, *sv2;
102 root 1.3
103     DBT dbt1, dbt2, dbt3;
104 root 1.1 } aio_cb;
105    
106     typedef aio_cb *aio_req;
107    
108     enum {
109     PRI_MIN = -4,
110     PRI_MAX = 4,
111    
112     DEFAULT_PRI = 0,
113     PRI_BIAS = -PRI_MIN,
114     NUM_PRI = PRI_MAX + PRI_BIAS + 1,
115     };
116    
117     #define AIO_TICKS ((1000000 + 1023) >> 10)
118    
119     static unsigned int max_poll_time = 0;
120     static unsigned int max_poll_reqs = 0;
121    
122     /* calculcate time difference in ~1/AIO_TICKS of a second */
123     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
124     {
125     return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
126     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
127     }
128    
129     static int next_pri = DEFAULT_PRI + PRI_BIAS;
130    
131     static unsigned int started, idle, wanted;
132    
133     #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
134     # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
135     #else
136     # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
137     #endif
138    
139     #define LOCK(mutex) pthread_mutex_lock (&(mutex))
140     #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
141    
142     /* worker threads management */
143     static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
144    
145     typedef struct worker {
146     /* locked by wrklock */
147     struct worker *prev, *next;
148    
149     pthread_t tid;
150    
151     /* locked by reslock, reqlock or wrklock */
152     aio_req req; /* currently processed request */
153     void *dbuf;
154     DIR *dirp;
155     } worker;
156    
157     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
158    
159     static void worker_clear (worker *wrk)
160     {
161     }
162    
163     static void worker_free (worker *wrk)
164     {
165     wrk->next->prev = wrk->prev;
166     wrk->prev->next = wrk->next;
167    
168     free (wrk);
169     }
170    
171     static volatile unsigned int nreqs, nready, npending;
172     static volatile unsigned int max_idle = 4;
173     static volatile unsigned int max_outstanding = 0xffffffff;
174     static int respipe [2];
175    
176     static pthread_mutex_t reslock = AIO_MUTEX_INIT;
177     static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
178     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
179    
180     #if WORDACCESS_UNSAFE
181    
182     static unsigned int get_nready ()
183     {
184     unsigned int retval;
185    
186     LOCK (reqlock);
187     retval = nready;
188     UNLOCK (reqlock);
189    
190     return retval;
191     }
192    
193     static unsigned int get_npending ()
194     {
195     unsigned int retval;
196    
197     LOCK (reslock);
198     retval = npending;
199     UNLOCK (reslock);
200    
201     return retval;
202     }
203    
204     static unsigned int get_nthreads ()
205     {
206     unsigned int retval;
207    
208     LOCK (wrklock);
209     retval = started;
210     UNLOCK (wrklock);
211    
212     return retval;
213     }
214    
215     #else
216    
217     # define get_nready() nready
218     # define get_npending() npending
219     # define get_nthreads() started
220    
221     #endif
222    
223     /*
224     * a somewhat faster data structure might be nice, but
225     * with 8 priorities this actually needs <20 insns
226     * per shift, the most expensive operation.
227     */
228     typedef struct {
229     aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
230     int size;
231     } reqq;
232    
233     static reqq req_queue;
234     static reqq res_queue;
235    
236     int reqq_push (reqq *q, aio_req req)
237     {
238     int pri = req->pri;
239     req->next = 0;
240    
241     if (q->qe[pri])
242     {
243     q->qe[pri]->next = req;
244     q->qe[pri] = req;
245     }
246     else
247     q->qe[pri] = q->qs[pri] = req;
248    
249     return q->size++;
250     }
251    
252     aio_req reqq_shift (reqq *q)
253     {
254     int pri;
255    
256     if (!q->size)
257     return 0;
258    
259     --q->size;
260    
261     for (pri = NUM_PRI; pri--; )
262     {
263     aio_req req = q->qs[pri];
264    
265     if (req)
266     {
267     if (!(q->qs[pri] = req->next))
268     q->qe[pri] = 0;
269    
270     return req;
271     }
272     }
273    
274     abort ();
275     }
276    
277     static int poll_cb ();
278     static void req_free (aio_req req);
279     static void req_cancel (aio_req req);
280    
281     static int req_invoke (aio_req req)
282     {
283     dSP;
284    
285     if (SvOK (req->callback))
286     {
287     ENTER;
288     SAVETMPS;
289     PUSHMARK (SP);
290     EXTEND (SP, 1);
291    
292     switch (req->type)
293     {
294 root 1.5 case REQ_DB_GET:
295     case REQ_DB_PGET:
296     dbt_to_sv (req->sv1, &req->dbt3);
297     SvREFCNT_dec (req->sv1);
298     break;
299    
300     case REQ_DB_CLOSE:
301     SvREFCNT_dec (req->sv1);
302     break;
303 root 1.1 }
304    
305 root 1.3 errno = req->result;
306    
307 root 1.1 PUTBACK;
308     call_sv (req->callback, G_VOID | G_EVAL);
309     SPAGAIN;
310    
311     FREETMPS;
312     LEAVE;
313     }
314    
315     return !SvTRUE (ERRSV);
316     }
317    
318     static void req_free (aio_req req)
319     {
320 root 1.2 free (req->buf1);
321     free (req->buf2);
322 root 1.1 Safefree (req);
323     }
324    
325 root 1.3 static void *aio_proc (void *arg);
326 root 1.1
327     static void start_thread (void)
328     {
329     sigset_t fullsigset, oldsigset;
330     pthread_attr_t attr;
331    
332     worker *wrk = calloc (1, sizeof (worker));
333    
334     if (!wrk)
335     croak ("unable to allocate worker thread data");
336    
337     pthread_attr_init (&attr);
338     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
339     #ifdef PTHREAD_SCOPE_PROCESS
340     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
341     #endif
342    
343     sigfillset (&fullsigset);
344    
345     LOCK (wrklock);
346     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
347    
348     if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
349     {
350     wrk->prev = &wrk_first;
351     wrk->next = wrk_first.next;
352     wrk_first.next->prev = wrk;
353     wrk_first.next = wrk;
354     ++started;
355     }
356     else
357     free (wrk);
358    
359     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
360     UNLOCK (wrklock);
361     }
362    
363     static void maybe_start_thread ()
364     {
365     if (get_nthreads () >= wanted)
366     return;
367    
368     /* todo: maybe use idle here, but might be less exact */
369     if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
370     return;
371    
372     start_thread ();
373     }
374    
375     static void req_send (aio_req req)
376     {
377 root 1.3 SV *wait_callback = 0;
378    
379     // synthesize callback if none given
380     if (!SvOK (req->callback))
381     {
382     dSP;
383     PUSHMARK (SP);
384     PUTBACK;
385     int count = call_sv (prepare_cb, G_ARRAY);
386     SPAGAIN;
387    
388     if (count != 2)
389     croak ("prepare callback must return exactly two values\n");
390    
391     wait_callback = SvREFCNT_inc (POPs);
392     SvREFCNT_dec (req->callback);
393     req->callback = SvREFCNT_inc (POPs);
394     }
395    
396 root 1.1 ++nreqs;
397    
398     LOCK (reqlock);
399     ++nready;
400     reqq_push (&req_queue, req);
401     pthread_cond_signal (&reqwait);
402     UNLOCK (reqlock);
403    
404     maybe_start_thread ();
405 root 1.3
406     if (wait_callback)
407     {
408     dSP;
409     PUSHMARK (SP);
410     PUTBACK;
411     call_sv (wait_callback, G_DISCARD);
412     SvREFCNT_dec (wait_callback);
413     }
414 root 1.1 }
415    
416     static void end_thread (void)
417     {
418     aio_req req;
419    
420     Newz (0, req, 1, aio_cb);
421    
422     req->type = REQ_QUIT;
423     req->pri = PRI_MAX + PRI_BIAS;
424    
425     LOCK (reqlock);
426     reqq_push (&req_queue, req);
427     pthread_cond_signal (&reqwait);
428     UNLOCK (reqlock);
429    
430     LOCK (wrklock);
431     --started;
432     UNLOCK (wrklock);
433     }
434    
435     static void set_max_idle (int nthreads)
436     {
437     if (WORDACCESS_UNSAFE) LOCK (reqlock);
438     max_idle = nthreads <= 0 ? 1 : nthreads;
439     if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
440     }
441    
442     static void min_parallel (int nthreads)
443     {
444     if (wanted < nthreads)
445     wanted = nthreads;
446     }
447    
448     static void max_parallel (int nthreads)
449     {
450     if (wanted > nthreads)
451     wanted = nthreads;
452    
453     while (started > wanted)
454     end_thread ();
455     }
456    
457     static void poll_wait ()
458     {
459     fd_set rfd;
460    
461     while (nreqs)
462     {
463     int size;
464     if (WORDACCESS_UNSAFE) LOCK (reslock);
465     size = res_queue.size;
466     if (WORDACCESS_UNSAFE) UNLOCK (reslock);
467    
468     if (size)
469     return;
470    
471     maybe_start_thread ();
472    
473     FD_ZERO(&rfd);
474     FD_SET(respipe [0], &rfd);
475    
476     select (respipe [0] + 1, &rfd, 0, 0, 0);
477     }
478     }
479    
480     static int poll_cb ()
481     {
482     dSP;
483     int count = 0;
484     int maxreqs = max_poll_reqs;
485     int do_croak = 0;
486     struct timeval tv_start, tv_now;
487     aio_req req;
488    
489     if (max_poll_time)
490     gettimeofday (&tv_start, 0);
491    
492     for (;;)
493     {
494     for (;;)
495     {
496     maybe_start_thread ();
497    
498     LOCK (reslock);
499     req = reqq_shift (&res_queue);
500    
501     if (req)
502     {
503     --npending;
504    
505     if (!res_queue.size)
506     {
507     /* read any signals sent by the worker threads */
508     char buf [4];
509     while (read (respipe [0], buf, 4) == 4)
510     ;
511     }
512     }
513    
514     UNLOCK (reslock);
515    
516     if (!req)
517     break;
518    
519     --nreqs;
520    
521     if (!req_invoke (req))
522     {
523     req_free (req);
524     croak (0);
525     }
526    
527     count++;
528    
529     req_free (req);
530    
531     if (maxreqs && !--maxreqs)
532     break;
533    
534     if (max_poll_time)
535     {
536     gettimeofday (&tv_now, 0);
537    
538     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
539     break;
540     }
541     }
542    
543     if (nreqs <= max_outstanding)
544     break;
545    
546     poll_wait ();
547    
548     ++maxreqs;
549     }
550    
551     return count;
552     }
553    
554     static void create_pipe ()
555     {
556     if (pipe (respipe))
557     croak ("unable to initialize result pipe");
558    
559     if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
560     croak ("cannot set result pipe to nonblocking mode");
561    
562     if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
563     croak ("cannot set result pipe to nonblocking mode");
564     }
565    
566     /*****************************************************************************/
567    
568     static void *aio_proc (void *thr_arg)
569     {
570     aio_req req;
571     struct timespec ts;
572     worker *self = (worker *)thr_arg;
573    
574     /* try to distribute timeouts somewhat evenly */
575     ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
576     * (1000000000UL / 1024UL);
577    
578     for (;;)
579     {
580     ts.tv_sec = time (0) + IDLE_TIMEOUT;
581    
582     LOCK (reqlock);
583    
584     for (;;)
585     {
586     self->req = req = reqq_shift (&req_queue);
587    
588     if (req)
589     break;
590    
591     ++idle;
592    
593     if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
594     == ETIMEDOUT)
595     {
596     if (idle > max_idle)
597     {
598     --idle;
599     UNLOCK (reqlock);
600     LOCK (wrklock);
601     --started;
602     UNLOCK (wrklock);
603     goto quit;
604     }
605    
606     /* we are allowed to idle, so do so without any timeout */
607     pthread_cond_wait (&reqwait, &reqlock);
608     ts.tv_sec = time (0) + IDLE_TIMEOUT;
609     }
610    
611     --idle;
612     }
613    
614     --nready;
615    
616     UNLOCK (reqlock);
617    
618     switch (req->type)
619     {
620     case REQ_QUIT:
621     goto quit;
622    
623 root 1.3 case REQ_ENV_OPEN:
624     req->result = req->env->open (req->env, req->buf1, req->uint1, req->int1);
625     break;
626    
627     case REQ_ENV_CLOSE:
628     req->result = req->env->close (req->env, req->uint1);
629     break;
630    
631     case REQ_DB_OPEN:
632     req->result = req->db->open (req->db, req->txn, req->buf1, req->buf2, req->int1, req->uint1, req->int2);
633     break;
634    
635     case REQ_DB_CLOSE:
636     req->result = req->db->close (req->db, req->uint1);
637     break;
638    
639     case REQ_DB_COMPACT:
640     req->result = req->db->compact (req->db, req->txn, &req->dbt1, &req->dbt2, 0, req->uint1, 0);
641     break;
642    
643     case REQ_DB_SYNC:
644     req->result = req->db->sync (req->db, req->uint1);
645     break;
646    
647     case REQ_DB_PUT:
648     req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1);
649     break;
650    
651 root 1.5 case REQ_DB_GET:
652     req->result = req->db->get (req->db, req->txn, &req->dbt1, &req->dbt3, req->uint1);
653     break;
654    
655     case REQ_DB_PGET:
656     req->result = req->db->pget (req->db, req->txn, &req->dbt1, &req->dbt2, &req->dbt3, req->uint1);
657     break;
658    
659     case REQ_TXN_COMMIT:
660     req->result = req->txn->commit (req->txn, req->uint1);
661     break;
662    
663     case REQ_TXN_ABORT:
664     req->result = req->txn->abort (req->txn);
665     break;
666    
667 root 1.1 default:
668 root 1.3 req->result = ENOSYS;
669 root 1.1 break;
670     }
671    
672     LOCK (reslock);
673    
674     ++npending;
675    
676     if (!reqq_push (&res_queue, req))
677     /* write a dummy byte to the pipe so fh becomes ready */
678     write (respipe [1], &respipe, 1);
679    
680     self->req = 0;
681     worker_clear (self);
682    
683     UNLOCK (reslock);
684     }
685    
686     quit:
687     LOCK (wrklock);
688     worker_free (self);
689     UNLOCK (wrklock);
690    
691     return 0;
692     }
693    
694     /*****************************************************************************/
695    
696     static void atfork_prepare (void)
697     {
698     LOCK (wrklock);
699     LOCK (reqlock);
700     LOCK (reslock);
701     }
702    
703     static void atfork_parent (void)
704     {
705     UNLOCK (reslock);
706     UNLOCK (reqlock);
707     UNLOCK (wrklock);
708     }
709    
710     static void atfork_child (void)
711     {
712     aio_req prv;
713    
714     while (prv = reqq_shift (&req_queue))
715     req_free (prv);
716    
717     while (prv = reqq_shift (&res_queue))
718     req_free (prv);
719    
720     while (wrk_first.next != &wrk_first)
721     {
722     worker *wrk = wrk_first.next;
723    
724     if (wrk->req)
725     req_free (wrk->req);
726    
727     worker_clear (wrk);
728     worker_free (wrk);
729     }
730    
731     started = 0;
732     idle = 0;
733     nreqs = 0;
734     nready = 0;
735     npending = 0;
736    
737     close (respipe [0]);
738     close (respipe [1]);
739     create_pipe ();
740    
741     atfork_parent ();
742     }
743    
744 root 1.2 #define dREQ(reqtype) \
745 root 1.1 aio_req req; \
746     int req_pri = next_pri; \
747     next_pri = DEFAULT_PRI + PRI_BIAS; \
748     \
749     if (SvOK (callback) && !SvROK (callback)) \
750     croak ("callback must be undef or of reference type"); \
751     \
752 root 1.2 Newz (0, req, 1, aio_cb); \
753 root 1.1 if (!req) \
754     croak ("out of memory during aio_req allocation"); \
755     \
756     req->callback = newSVsv (callback); \
757 root 1.2 req->type = (reqtype); \
758 root 1.1 req->pri = req_pri
759    
760     #define REQ_SEND \
761 root 1.2 req_send (req)
762    
763 root 1.5 #define SvPTR(var, arg, type, class, nullok) \
764     if (!SvOK (arg)) \
765     { \
766     if (!nullok) \
767     Perl_croak (# var " must be a " # class " object, not undef"); \
768     \
769     (var) = 0; \
770     } \
771     else if (sv_derived_from ((arg), # class)) \
772     { \
773     IV tmp = SvIV ((SV*) SvRV (arg)); \
774     (var) = INT2PTR (type, tmp); \
775     if (!var) \
776     Perl_croak (# var " is not a valid " # class " object anymore"); \
777     } \
778     else \
779     Perl_croak (# var " is not of type " # class); \
780     \
781 root 1.3
782 root 1.5 static void
783     ptr_nuke (SV *sv)
784 root 1.3 {
785 root 1.5 assert (SvROK (sv));
786     sv_setiv (SvRV (sv), 0);
787     }
788 root 1.3
789 root 1.2 MODULE = BDB PACKAGE = BDB
790 root 1.1
791     PROTOTYPES: ENABLE
792    
793     BOOT:
794     {
795 root 1.2 HV *stash = gv_stashpv ("BDB", 1);
796    
797     static const struct {
798     const char *name;
799     IV iv;
800     } *civ, const_iv[] = {
801     #define const_iv(name) { # name, (IV)DB_ ## name },
802     const_iv (RPCCLIENT)
803     const_iv (INIT_CDB)
804     const_iv (INIT_LOCK)
805     const_iv (INIT_LOG)
806     const_iv (INIT_MPOOL)
807     const_iv (INIT_REP)
808     const_iv (INIT_TXN)
809     const_iv (RECOVER)
810     const_iv (INIT_TXN)
811     const_iv (RECOVER_FATAL)
812     const_iv (CREATE)
813     const_iv (USE_ENVIRON)
814     const_iv (USE_ENVIRON_ROOT)
815     const_iv (LOCKDOWN)
816     const_iv (PRIVATE)
817     const_iv (REGISTER)
818     const_iv (SYSTEM_MEM)
819     const_iv (AUTO_COMMIT)
820     const_iv (CDB_ALLDB)
821     const_iv (DIRECT_DB)
822     const_iv (DIRECT_LOG)
823     const_iv (DSYNC_DB)
824     const_iv (DSYNC_LOG)
825     const_iv (LOG_AUTOREMOVE)
826     const_iv (LOG_INMEMORY)
827     const_iv (NOLOCKING)
828     const_iv (MULTIVERSION)
829     const_iv (NOMMAP)
830     const_iv (NOPANIC)
831     const_iv (OVERWRITE)
832     const_iv (PANIC_ENVIRONMENT)
833     const_iv (REGION_INIT)
834     const_iv (TIME_NOTGRANTED)
835     const_iv (TXN_NOSYNC)
836     const_iv (TXN_SNAPSHOT)
837     const_iv (TXN_WRITE_NOSYNC)
838 root 1.5 const_iv (WRITECURSOR)
839 root 1.2 const_iv (YIELDCPU)
840     const_iv (ENCRYPT_AES)
841     const_iv (XA_CREATE)
842     const_iv (BTREE)
843     const_iv (HASH)
844     const_iv (QUEUE)
845     const_iv (RECNO)
846     const_iv (UNKNOWN)
847     const_iv (EXCL)
848 root 1.4 const_iv (READ_COMMITTED)
849 root 1.2 const_iv (READ_UNCOMMITTED)
850     const_iv (TRUNCATE)
851     const_iv (NOSYNC)
852     const_iv (CHKSUM)
853     const_iv (ENCRYPT)
854     const_iv (TXN_NOT_DURABLE)
855     const_iv (DUP)
856     const_iv (DUPSORT)
857     const_iv (RECNUM)
858     const_iv (RENUMBER)
859     const_iv (REVSPLITOFF)
860     const_iv (INORDER)
861     const_iv (CONSUME)
862     const_iv (CONSUME_WAIT)
863 root 1.5 const_iv (GET_BOTH)
864     //const_iv (SET_RECNO)
865     //const_iv (MULTIPLE)
866 root 1.2 const_iv (SNAPSHOT)
867     const_iv (JOIN_ITEM)
868     const_iv (RMW)
869    
870     const_iv (NOTFOUND)
871     const_iv (KEYEMPTY)
872     const_iv (LOCK_DEADLOCK)
873     const_iv (LOCK_NOTGRANTED)
874     const_iv (RUNRECOVERY)
875 root 1.3 const_iv (OLD_VERSION)
876     const_iv (REP_HANDLE_DEAD)
877     const_iv (REP_LOCKOUT)
878    
879     const_iv (FREE_SPACE)
880     const_iv (FREELIST_ONLY)
881    
882     const_iv (APPEND)
883     const_iv (NODUPDATA)
884     const_iv (NOOVERWRITE)
885    
886 root 1.4 const_iv (TXN_NOWAIT)
887     const_iv (TXN_SNAPSHOT)
888     const_iv (TXN_SYNC)
889    
890 root 1.3 const_iv (SET_LOCK_TIMEOUT)
891     const_iv (SET_TXN_TIMEOUT)
892 root 1.2 };
893    
894     for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
895     newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
896 root 1.1
897     create_pipe ();
898     pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
899     }
900    
901     void
902     max_poll_reqs (int nreqs)
903     PROTOTYPE: $
904     CODE:
905     max_poll_reqs = nreqs;
906    
907     void
908     max_poll_time (double nseconds)
909     PROTOTYPE: $
910     CODE:
911     max_poll_time = nseconds * AIO_TICKS;
912    
913     void
914     min_parallel (int nthreads)
915     PROTOTYPE: $
916    
917     void
918     max_parallel (int nthreads)
919     PROTOTYPE: $
920    
921     void
922     max_idle (int nthreads)
923     PROTOTYPE: $
924     CODE:
925     set_max_idle (nthreads);
926    
927     int
928     max_outstanding (int maxreqs)
929     PROTOTYPE: $
930     CODE:
931     RETVAL = max_outstanding;
932     max_outstanding = maxreqs;
933     OUTPUT:
934     RETVAL
935    
936     int
937 root 1.3 dbreq_pri (int pri = 0)
938 root 1.1 PROTOTYPE: ;$
939     CODE:
940     RETVAL = next_pri - PRI_BIAS;
941     if (items > 0)
942     {
943     if (pri < PRI_MIN) pri = PRI_MIN;
944     if (pri > PRI_MAX) pri = PRI_MAX;
945     next_pri = pri + PRI_BIAS;
946     }
947     OUTPUT:
948     RETVAL
949    
950     void
951 root 1.3 dbreq_nice (int nice = 0)
952 root 1.1 CODE:
953     nice = next_pri - nice;
954     if (nice < PRI_MIN) nice = PRI_MIN;
955     if (nice > PRI_MAX) nice = PRI_MAX;
956     next_pri = nice + PRI_BIAS;
957    
958     void
959     flush ()
960     PROTOTYPE:
961     CODE:
962     while (nreqs)
963     {
964     poll_wait ();
965     poll_cb ();
966     }
967    
968     int
969 root 1.3 poll ()
970 root 1.1 PROTOTYPE:
971     CODE:
972     poll_wait ();
973     RETVAL = poll_cb ();
974     OUTPUT:
975     RETVAL
976    
977     int
978 root 1.3 poll_fileno ()
979 root 1.1 PROTOTYPE:
980     CODE:
981     RETVAL = respipe [0];
982     OUTPUT:
983     RETVAL
984    
985     int
986 root 1.3 poll_cb (...)
987 root 1.1 PROTOTYPE:
988     CODE:
989     RETVAL = poll_cb ();
990     OUTPUT:
991     RETVAL
992    
993     void
994 root 1.3 poll_wait ()
995 root 1.1 PROTOTYPE:
996     CODE:
997     poll_wait ();
998    
999     int
1000 root 1.3 nreqs ()
1001 root 1.1 PROTOTYPE:
1002     CODE:
1003     RETVAL = nreqs;
1004     OUTPUT:
1005     RETVAL
1006    
1007     int
1008 root 1.3 nready ()
1009 root 1.1 PROTOTYPE:
1010     CODE:
1011     RETVAL = get_nready ();
1012     OUTPUT:
1013     RETVAL
1014    
1015     int
1016 root 1.3 npending ()
1017 root 1.1 PROTOTYPE:
1018     CODE:
1019     RETVAL = get_npending ();
1020     OUTPUT:
1021     RETVAL
1022    
1023     int
1024 root 1.3 nthreads ()
1025 root 1.1 PROTOTYPE:
1026     CODE:
1027     if (WORDACCESS_UNSAFE) LOCK (wrklock);
1028     RETVAL = started;
1029     if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
1030     OUTPUT:
1031     RETVAL
1032    
1033 root 1.3 void
1034     set_sync_prepare (SV *cb)
1035     PROTOTYPE: &
1036     CODE:
1037     SvREFCNT_dec (prepare_cb);
1038     prepare_cb = newSVsv (cb);
1039    
1040 root 1.2 DB_ENV *
1041 root 1.3 db_env_create (U32 env_flags = 0)
1042 root 1.2 CODE:
1043     {
1044 root 1.3 errno = db_env_create (&RETVAL, env_flags);
1045     if (errno)
1046     croak ("db_env_create: %s", db_strerror (errno));
1047 root 1.2 }
1048 root 1.3 OUTPUT:
1049     RETVAL
1050 root 1.2
1051     void
1052 root 1.3 db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef)
1053 root 1.2 CODE:
1054     {
1055     dREQ (REQ_ENV_OPEN);
1056     req->env = env;
1057 root 1.3 req->uint1 = open_flags | DB_THREAD;
1058 root 1.2 req->int1 = mode;
1059 root 1.3 req->buf1 = strdup_ornull (db_home);
1060 root 1.2 REQ_SEND;
1061     }
1062    
1063     void
1064 root 1.3 db_env_close (DB_ENV *env, U32 flags = 0, SV *callback = &PL_sv_undef)
1065 root 1.2 CODE:
1066     {
1067     dREQ (REQ_ENV_CLOSE);
1068     req->env = env;
1069     req->uint1 = flags;
1070     REQ_SEND;
1071 root 1.5 ptr_nuke (ST (0));
1072 root 1.2 }
1073    
1074     DB *
1075 root 1.3 db_create (DB_ENV *env = 0, U32 flags = 0)
1076 root 1.2 CODE:
1077     {
1078 root 1.3 errno = db_create (&RETVAL, env, flags);
1079     if (errno)
1080     croak ("db_env_create: %s", db_strerror (errno));
1081 root 1.5
1082     if (RETVAL)
1083     RETVAL->app_private = (void *)newSVsv (ST (0));
1084 root 1.2 }
1085 root 1.3 OUTPUT:
1086     RETVAL
1087 root 1.2
1088     void
1089 root 1.5 db_open (DB *db, DB_TXN_ornull *txnid, octetstring file, octetstring database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef)
1090 root 1.2 CODE:
1091     {
1092     dREQ (REQ_DB_OPEN);
1093     req->db = db;
1094     req->txn = txnid;
1095 root 1.3 req->buf1 = strdup_ornull (file);
1096     req->buf2 = strdup_ornull (database);
1097 root 1.2 req->int1 = type;
1098 root 1.3 req->uint1 = flags | DB_THREAD;
1099 root 1.2 req->int2 = mode;
1100     REQ_SEND;
1101     }
1102    
1103     void
1104 root 1.3 db_close (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1105 root 1.2 CODE:
1106     {
1107     dREQ (REQ_DB_CLOSE);
1108     req->db = db;
1109     req->uint1 = flags;
1110 root 1.5 req->sv1 = (SV *)db->app_private;
1111 root 1.2 REQ_SEND;
1112 root 1.5 ptr_nuke (ST (0));
1113 root 1.2 }
1114    
1115 root 1.3 void
1116 root 1.5 db_compact (DB *db, DB_TXN_ornull *txn = 0, SV *start = 0, SV *stop = 0, SV *unused1 = 0, U32 flags = DB_FREE_SPACE, SV *unused2 = 0, SV *callback = &PL_sv_undef)
1117 root 1.3 CODE:
1118     {
1119     dREQ (REQ_DB_COMPACT);
1120     req->db = db;
1121     req->txn = txn;
1122 root 1.5 sv_to_dbt (&req->dbt1, start);
1123     sv_to_dbt (&req->dbt2, stop);
1124 root 1.3 req->uint1 = flags;
1125     REQ_SEND;
1126     }
1127    
1128     void
1129     db_sync (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1130     CODE:
1131     {
1132     dREQ (REQ_DB_SYNC);
1133     req->db = db;
1134     req->uint1 = flags;
1135     REQ_SEND;
1136     }
1137    
1138     void
1139 root 1.5 db_put (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1140 root 1.3 CODE:
1141     {
1142     dREQ (REQ_DB_PUT);
1143     req->db = db;
1144 root 1.5 req->txn = txn;
1145     sv_to_dbt (&req->dbt1, key);
1146     sv_to_dbt (&req->dbt2, data);
1147 root 1.3 req->uint1 = flags;
1148     REQ_SEND;
1149     }
1150    
1151 root 1.5 void
1152     db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1153     CODE:
1154     {
1155     dREQ (REQ_DB_GET);
1156     req->db = db;
1157     req->txn = txn;
1158     sv_to_dbt (&req->dbt1, key);
1159     req->dbt3.flags = DB_DBT_MALLOC;
1160     req->uint1 = flags;
1161     req->sv1 = SvREFCNT_inc (data);
1162     SvREADONLY_on (data);
1163     REQ_SEND;
1164     }
1165    
1166     void
1167     db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1168     CODE:
1169     {
1170     dREQ (REQ_DB_PGET);
1171     req->db = db;
1172     req->txn = txn;
1173     sv_to_dbt (&req->dbt1, key);
1174     sv_to_dbt (&req->dbt2, pkey);
1175     req->dbt3.flags = DB_DBT_MALLOC;
1176     req->uint1 = flags;
1177     req->sv1 = SvREFCNT_inc (data);
1178     SvREADONLY_on (data);
1179     REQ_SEND;
1180     }
1181    
1182     void
1183     db_txn_commit (DB_TXN *txn, U32 flags = 0, SV *callback = &PL_sv_undef)
1184     CODE:
1185     {
1186     dREQ (REQ_TXN_COMMIT);
1187     req->txn = txn;
1188     req->uint1 = flags;
1189     REQ_SEND;
1190     ptr_nuke (ST (0));
1191     }
1192    
1193     void
1194     db_txn_abort (DB_TXN *txn, SV *callback = &PL_sv_undef)
1195     CODE:
1196     {
1197     dREQ (REQ_TXN_ABORT);
1198     req->txn = txn;
1199     REQ_SEND;
1200     ptr_nuke (ST (0));
1201     }
1202    
1203 root 1.2
1204     MODULE = BDB PACKAGE = BDB::Env
1205    
1206 root 1.5 void
1207     DESTROY (DB_ENV_ornull *env)
1208     CODE:
1209     if (env)
1210     env->close (env, 0);
1211    
1212 root 1.2 int set_cachesize (DB_ENV *env, U32 gbytes, U32 bytes, int ncache = 0)
1213 root 1.3 CODE:
1214     RETVAL = env->set_cachesize (env, gbytes, bytes, ncache);
1215     OUTPUT:
1216     RETVAL
1217 root 1.2
1218     int set_flags (DB_ENV *env, U32 flags, int onoff)
1219 root 1.3 CODE:
1220     RETVAL = env->set_flags (env, flags, onoff);
1221     OUTPUT:
1222     RETVAL
1223    
1224     int set_encrypt (DB_ENV *env, const char *password, U32 flags = 0)
1225     CODE:
1226     RETVAL = env->set_encrypt (env, password, flags);
1227     OUTPUT:
1228     RETVAL
1229    
1230     int set_timeout (DB_ENV *env, NV timeout, U32 flags)
1231     CODE:
1232     RETVAL = env->set_timeout (env, timeout * 1000000, flags);
1233     OUTPUT:
1234     RETVAL
1235 root 1.2
1236 root 1.5 DB_TXN *
1237     txn_begin (DB_ENV *env, DB_TXN_ornull *parent = 0, U32 flags = 0)
1238     CODE:
1239     errno = env->txn_begin (env, parent, &RETVAL, flags);
1240     if (errno)
1241     croak ("txn_begin: %s", db_strerror (errno));
1242     OUTPUT:
1243     RETVAL
1244 root 1.2
1245     MODULE = BDB PACKAGE = BDB::Db
1246    
1247 root 1.5 void
1248     DESTROY (DB_ornull *db)
1249     CODE:
1250     if (db)
1251     {
1252     SV *env = (SV *)db->app_private;
1253     db->close (db, 0);
1254     SvREFCNT_dec (env);
1255     }
1256    
1257 root 1.2 int set_cachesize (DB *db, U32 gbytes, U32 bytes, int ncache = 0)
1258 root 1.3 CODE:
1259     RETVAL = db->set_cachesize (db, gbytes, bytes, ncache);
1260     OUTPUT:
1261     RETVAL
1262 root 1.2
1263 root 1.3 int set_flags (DB *db, U32 flags);
1264     CODE:
1265     RETVAL = db->set_flags (db, flags);
1266     OUTPUT:
1267     RETVAL
1268 root 1.2
1269     int set_encrypt (DB *db, const char *password, U32 flags)
1270 root 1.3 CODE:
1271     RETVAL = db->set_encrypt (db, password, flags);
1272     OUTPUT:
1273     RETVAL
1274 root 1.2
1275     int set_lorder (DB *db, int lorder)
1276 root 1.3 CODE:
1277     RETVAL = db->set_lorder (db, lorder);
1278     OUTPUT:
1279     RETVAL
1280 root 1.2
1281    
1282     int set_bt_minkey (DB *db, U32 minkey)
1283 root 1.3 CODE:
1284     RETVAL = db->set_bt_minkey (db, minkey);
1285     OUTPUT:
1286     RETVAL
1287 root 1.2
1288     int set_re_delim(DB *db, int delim);
1289 root 1.3 CODE:
1290     RETVAL = db->set_re_delim (db, delim);
1291     OUTPUT:
1292     RETVAL
1293 root 1.2
1294     int set_re_pad (DB *db, int re_pad)
1295 root 1.3 CODE:
1296     RETVAL = db->set_re_pad (db, re_pad);
1297     OUTPUT:
1298     RETVAL
1299 root 1.2
1300     int set_re_source (DB *db, char *source)
1301 root 1.3 CODE:
1302     RETVAL = db->set_re_source (db, source);
1303     OUTPUT:
1304     RETVAL
1305 root 1.2
1306     int set_re_len (DB *db, U32 re_len)
1307 root 1.3 CODE:
1308     RETVAL = db->set_re_len (db, re_len);
1309     OUTPUT:
1310     RETVAL
1311 root 1.2
1312     int set_h_ffactor (DB *db, U32 h_ffactor)
1313 root 1.3 CODE:
1314     RETVAL = db->set_h_ffactor (db, h_ffactor);
1315     OUTPUT:
1316     RETVAL
1317 root 1.2
1318     int set_h_nelem (DB *db, U32 h_nelem)
1319 root 1.3 CODE:
1320     RETVAL = db->set_h_nelem (db, h_nelem);
1321     OUTPUT:
1322     RETVAL
1323 root 1.2
1324     int set_q_extentsize (DB *db, U32 extentsize)
1325 root 1.3 CODE:
1326     RETVAL = db->set_q_extentsize (db, extentsize);
1327     OUTPUT:
1328     RETVAL
1329 root 1.2
1330 root 1.5 MODULE = BDB PACKAGE = BDB::Txn
1331    
1332     void
1333     DESTROY (DB_TXN_ornull *txn)
1334     CODE:
1335     if (txn)
1336     txn->abort (txn);
1337    
1338     int set_timeout (DB_TXN *txn, NV timeout, U32 flags)
1339     CODE:
1340     RETVAL = txn->set_timeout (txn, timeout * 1000000, flags);
1341     OUTPUT:
1342     RETVAL
1343    
1344 root 1.1