ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
Revision: 1.3
Committed: Mon Feb 5 22:19:07 2007 UTC (17 years, 3 months ago) by root
Branch: MAIN
Changes since 1.2: +241 -39 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /* solaris */
2     #define _POSIX_PTHREAD_SEMANTICS 1
3    
4     #if __linux && !defined(_GNU_SOURCE)
5     # define _GNU_SOURCE
6     #endif
7    
8     /* just in case */
9     #define _REENTRANT 1
10    
11     #include <errno.h>
12    
13     #include "EXTERN.h"
14     #include "perl.h"
15     #include "XSUB.h"
16    
17     #include <pthread.h>
18    
19     #include <stddef.h>
20     #include <stdlib.h>
21     #include <errno.h>
22     #include <sys/time.h>
23     #include <sys/types.h>
24     #include <limits.h>
25     #include <unistd.h>
26     #include <fcntl.h>
27 root 1.2
28     #include <db.h>
29 root 1.1
30     /* number of seconds after which idle threads exit */
31     #define IDLE_TIMEOUT 10
32    
33     /* wether word reads are potentially non-atomic.
34     * this is conservatice, likely most arches this runs
35     * on have atomic word read/writes.
36     */
37     #ifndef WORDACCESS_UNSAFE
38     # if __i386 || __x86_64
39     # define WORDACCESS_UNSAFE 0
40     # else
41     # define WORDACCESS_UNSAFE 1
42     # endif
43     #endif
44    
45     typedef SV SV8; /* byte-sv, used for argument-checking */
46 root 1.3 typedef char *octetstring;
47 root 1.1
48 root 1.3 static SV *prepare_cb;
49    
50     static inline char *
51     strdup_ornull (const char *s)
52 root 1.2 {
53 root 1.3 return s ? strdup (s) : 0;
54     }
55    
56     enum {
57 root 1.1 REQ_QUIT,
58 root 1.2 REQ_ENV_OPEN, REQ_ENV_CLOSE,
59 root 1.3 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_PUT,
60 root 1.1 };
61    
62     typedef struct aio_cb
63     {
64 root 1.2 struct aio_cb *volatile next;
65     SV *callback;
66 root 1.3 int type, pri, result;
67 root 1.2
68     DB_ENV *env;
69     DB *db;
70     DB_TXN *txn;
71     DBC *cursor;
72     int int1, int2;
73     U32 uint1, uint2;
74     char *buf1, *buf2;
75 root 1.3
76     DBT dbt1, dbt2, dbt3;
77 root 1.1 } aio_cb;
78    
79     typedef aio_cb *aio_req;
80    
81     enum {
82     PRI_MIN = -4,
83     PRI_MAX = 4,
84    
85     DEFAULT_PRI = 0,
86     PRI_BIAS = -PRI_MIN,
87     NUM_PRI = PRI_MAX + PRI_BIAS + 1,
88     };
89    
90     #define AIO_TICKS ((1000000 + 1023) >> 10)
91    
92     static unsigned int max_poll_time = 0;
93     static unsigned int max_poll_reqs = 0;
94    
95     /* calculcate time difference in ~1/AIO_TICKS of a second */
96     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
97     {
98     return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
99     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
100     }
101    
102     static int next_pri = DEFAULT_PRI + PRI_BIAS;
103    
104     static unsigned int started, idle, wanted;
105    
106     #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
107     # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
108     #else
109     # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
110     #endif
111    
112     #define LOCK(mutex) pthread_mutex_lock (&(mutex))
113     #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
114    
115     /* worker threads management */
116     static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
117    
118     typedef struct worker {
119     /* locked by wrklock */
120     struct worker *prev, *next;
121    
122     pthread_t tid;
123    
124     /* locked by reslock, reqlock or wrklock */
125     aio_req req; /* currently processed request */
126     void *dbuf;
127     DIR *dirp;
128     } worker;
129    
130     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
131    
132     static void worker_clear (worker *wrk)
133     {
134     }
135    
136     static void worker_free (worker *wrk)
137     {
138     wrk->next->prev = wrk->prev;
139     wrk->prev->next = wrk->next;
140    
141     free (wrk);
142     }
143    
144     static volatile unsigned int nreqs, nready, npending;
145     static volatile unsigned int max_idle = 4;
146     static volatile unsigned int max_outstanding = 0xffffffff;
147     static int respipe [2];
148    
149     static pthread_mutex_t reslock = AIO_MUTEX_INIT;
150     static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
151     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
152    
153     #if WORDACCESS_UNSAFE
154    
155     static unsigned int get_nready ()
156     {
157     unsigned int retval;
158    
159     LOCK (reqlock);
160     retval = nready;
161     UNLOCK (reqlock);
162    
163     return retval;
164     }
165    
166     static unsigned int get_npending ()
167     {
168     unsigned int retval;
169    
170     LOCK (reslock);
171     retval = npending;
172     UNLOCK (reslock);
173    
174     return retval;
175     }
176    
177     static unsigned int get_nthreads ()
178     {
179     unsigned int retval;
180    
181     LOCK (wrklock);
182     retval = started;
183     UNLOCK (wrklock);
184    
185     return retval;
186     }
187    
188     #else
189    
190     # define get_nready() nready
191     # define get_npending() npending
192     # define get_nthreads() started
193    
194     #endif
195    
196     /*
197     * a somewhat faster data structure might be nice, but
198     * with 8 priorities this actually needs <20 insns
199     * per shift, the most expensive operation.
200     */
201     typedef struct {
202     aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
203     int size;
204     } reqq;
205    
206     static reqq req_queue;
207     static reqq res_queue;
208    
209     int reqq_push (reqq *q, aio_req req)
210     {
211     int pri = req->pri;
212     req->next = 0;
213    
214     if (q->qe[pri])
215     {
216     q->qe[pri]->next = req;
217     q->qe[pri] = req;
218     }
219     else
220     q->qe[pri] = q->qs[pri] = req;
221    
222     return q->size++;
223     }
224    
225     aio_req reqq_shift (reqq *q)
226     {
227     int pri;
228    
229     if (!q->size)
230     return 0;
231    
232     --q->size;
233    
234     for (pri = NUM_PRI; pri--; )
235     {
236     aio_req req = q->qs[pri];
237    
238     if (req)
239     {
240     if (!(q->qs[pri] = req->next))
241     q->qe[pri] = 0;
242    
243     return req;
244     }
245     }
246    
247     abort ();
248     }
249    
250     static int poll_cb ();
251     static void req_free (aio_req req);
252     static void req_cancel (aio_req req);
253    
254     static int req_invoke (aio_req req)
255     {
256     dSP;
257    
258     if (SvOK (req->callback))
259     {
260     ENTER;
261     SAVETMPS;
262     PUSHMARK (SP);
263     EXTEND (SP, 1);
264    
265     switch (req->type)
266     {
267     }
268    
269 root 1.3 errno = req->result;
270    
271 root 1.1 PUTBACK;
272     call_sv (req->callback, G_VOID | G_EVAL);
273     SPAGAIN;
274    
275     FREETMPS;
276     LEAVE;
277     }
278    
279     return !SvTRUE (ERRSV);
280     }
281    
282     static void req_free (aio_req req)
283     {
284 root 1.2 free (req->buf1);
285     free (req->buf2);
286 root 1.1 Safefree (req);
287     }
288    
289 root 1.3 static void *aio_proc (void *arg);
290 root 1.1
291     static void start_thread (void)
292     {
293     sigset_t fullsigset, oldsigset;
294     pthread_attr_t attr;
295    
296     worker *wrk = calloc (1, sizeof (worker));
297    
298     if (!wrk)
299     croak ("unable to allocate worker thread data");
300    
301     pthread_attr_init (&attr);
302     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
303     #ifdef PTHREAD_SCOPE_PROCESS
304     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
305     #endif
306    
307     sigfillset (&fullsigset);
308    
309     LOCK (wrklock);
310     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
311    
312     if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
313     {
314     wrk->prev = &wrk_first;
315     wrk->next = wrk_first.next;
316     wrk_first.next->prev = wrk;
317     wrk_first.next = wrk;
318     ++started;
319     }
320     else
321     free (wrk);
322    
323     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
324     UNLOCK (wrklock);
325     }
326    
327     static void maybe_start_thread ()
328     {
329     if (get_nthreads () >= wanted)
330     return;
331    
332     /* todo: maybe use idle here, but might be less exact */
333     if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
334     return;
335    
336     start_thread ();
337     }
338    
339     static void req_send (aio_req req)
340     {
341 root 1.3 SV *wait_callback = 0;
342    
343     // synthesize callback if none given
344     if (!SvOK (req->callback))
345     {
346     dSP;
347     PUSHMARK (SP);
348     PUTBACK;
349     int count = call_sv (prepare_cb, G_ARRAY);
350     SPAGAIN;
351    
352     if (count != 2)
353     croak ("prepare callback must return exactly two values\n");
354    
355     wait_callback = SvREFCNT_inc (POPs);
356     SvREFCNT_dec (req->callback);
357     req->callback = SvREFCNT_inc (POPs);
358     }
359    
360 root 1.1 ++nreqs;
361    
362     LOCK (reqlock);
363     ++nready;
364     reqq_push (&req_queue, req);
365     pthread_cond_signal (&reqwait);
366     UNLOCK (reqlock);
367    
368     maybe_start_thread ();
369 root 1.3
370     if (wait_callback)
371     {
372     dSP;
373     PUSHMARK (SP);
374     PUTBACK;
375     call_sv (wait_callback, G_DISCARD);
376     SvREFCNT_dec (wait_callback);
377     }
378 root 1.1 }
379    
380     static void end_thread (void)
381     {
382     aio_req req;
383    
384     Newz (0, req, 1, aio_cb);
385    
386     req->type = REQ_QUIT;
387     req->pri = PRI_MAX + PRI_BIAS;
388    
389     LOCK (reqlock);
390     reqq_push (&req_queue, req);
391     pthread_cond_signal (&reqwait);
392     UNLOCK (reqlock);
393    
394     LOCK (wrklock);
395     --started;
396     UNLOCK (wrklock);
397     }
398    
399     static void set_max_idle (int nthreads)
400     {
401     if (WORDACCESS_UNSAFE) LOCK (reqlock);
402     max_idle = nthreads <= 0 ? 1 : nthreads;
403     if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
404     }
405    
406     static void min_parallel (int nthreads)
407     {
408     if (wanted < nthreads)
409     wanted = nthreads;
410     }
411    
412     static void max_parallel (int nthreads)
413     {
414     if (wanted > nthreads)
415     wanted = nthreads;
416    
417     while (started > wanted)
418     end_thread ();
419     }
420    
421     static void poll_wait ()
422     {
423     fd_set rfd;
424    
425     while (nreqs)
426     {
427     int size;
428     if (WORDACCESS_UNSAFE) LOCK (reslock);
429     size = res_queue.size;
430     if (WORDACCESS_UNSAFE) UNLOCK (reslock);
431    
432     if (size)
433     return;
434    
435     maybe_start_thread ();
436    
437     FD_ZERO(&rfd);
438     FD_SET(respipe [0], &rfd);
439    
440     select (respipe [0] + 1, &rfd, 0, 0, 0);
441     }
442     }
443    
444     static int poll_cb ()
445     {
446     dSP;
447     int count = 0;
448     int maxreqs = max_poll_reqs;
449     int do_croak = 0;
450     struct timeval tv_start, tv_now;
451     aio_req req;
452    
453     if (max_poll_time)
454     gettimeofday (&tv_start, 0);
455    
456     for (;;)
457     {
458     for (;;)
459     {
460     maybe_start_thread ();
461    
462     LOCK (reslock);
463     req = reqq_shift (&res_queue);
464    
465     if (req)
466     {
467     --npending;
468    
469     if (!res_queue.size)
470     {
471     /* read any signals sent by the worker threads */
472     char buf [4];
473     while (read (respipe [0], buf, 4) == 4)
474     ;
475     }
476     }
477    
478     UNLOCK (reslock);
479    
480     if (!req)
481     break;
482    
483     --nreqs;
484    
485     if (!req_invoke (req))
486     {
487     req_free (req);
488     croak (0);
489     }
490    
491     count++;
492    
493     req_free (req);
494    
495     if (maxreqs && !--maxreqs)
496     break;
497    
498     if (max_poll_time)
499     {
500     gettimeofday (&tv_now, 0);
501    
502     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
503     break;
504     }
505     }
506    
507     if (nreqs <= max_outstanding)
508     break;
509    
510     poll_wait ();
511    
512     ++maxreqs;
513     }
514    
515     return count;
516     }
517    
518     static void create_pipe ()
519     {
520     if (pipe (respipe))
521     croak ("unable to initialize result pipe");
522    
523     if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
524     croak ("cannot set result pipe to nonblocking mode");
525    
526     if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
527     croak ("cannot set result pipe to nonblocking mode");
528     }
529    
530     /*****************************************************************************/
531    
532     static void *aio_proc (void *thr_arg)
533     {
534     aio_req req;
535     struct timespec ts;
536     worker *self = (worker *)thr_arg;
537    
538     /* try to distribute timeouts somewhat evenly */
539     ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
540     * (1000000000UL / 1024UL);
541    
542     for (;;)
543     {
544     ts.tv_sec = time (0) + IDLE_TIMEOUT;
545    
546     LOCK (reqlock);
547    
548     for (;;)
549     {
550     self->req = req = reqq_shift (&req_queue);
551    
552     if (req)
553     break;
554    
555     ++idle;
556    
557     if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
558     == ETIMEDOUT)
559     {
560     if (idle > max_idle)
561     {
562     --idle;
563     UNLOCK (reqlock);
564     LOCK (wrklock);
565     --started;
566     UNLOCK (wrklock);
567     goto quit;
568     }
569    
570     /* we are allowed to idle, so do so without any timeout */
571     pthread_cond_wait (&reqwait, &reqlock);
572     ts.tv_sec = time (0) + IDLE_TIMEOUT;
573     }
574    
575     --idle;
576     }
577    
578     --nready;
579    
580     UNLOCK (reqlock);
581    
582     switch (req->type)
583     {
584     case REQ_QUIT:
585     goto quit;
586    
587 root 1.3 case REQ_ENV_OPEN:
588     req->result = req->env->open (req->env, req->buf1, req->uint1, req->int1);
589     break;
590    
591     case REQ_ENV_CLOSE:
592     req->result = req->env->close (req->env, req->uint1);
593     break;
594    
595     case REQ_DB_OPEN:
596     req->result = req->db->open (req->db, req->txn, req->buf1, req->buf2, req->int1, req->uint1, req->int2);
597     break;
598    
599     case REQ_DB_CLOSE:
600     req->result = req->db->close (req->db, req->uint1);
601     break;
602    
603     case REQ_DB_COMPACT:
604     req->result = req->db->compact (req->db, req->txn, &req->dbt1, &req->dbt2, 0, req->uint1, 0);
605     break;
606    
607     case REQ_DB_SYNC:
608     req->result = req->db->sync (req->db, req->uint1);
609     break;
610    
611     case REQ_DB_PUT:
612     req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1);
613     break;
614    
615 root 1.1 default:
616 root 1.3 req->result = ENOSYS;
617 root 1.1 break;
618     }
619    
620     LOCK (reslock);
621    
622     ++npending;
623    
624     if (!reqq_push (&res_queue, req))
625     /* write a dummy byte to the pipe so fh becomes ready */
626     write (respipe [1], &respipe, 1);
627    
628     self->req = 0;
629     worker_clear (self);
630    
631     UNLOCK (reslock);
632     }
633    
634     quit:
635     LOCK (wrklock);
636     worker_free (self);
637     UNLOCK (wrklock);
638    
639     return 0;
640     }
641    
642     /*****************************************************************************/
643    
644     static void atfork_prepare (void)
645     {
646     LOCK (wrklock);
647     LOCK (reqlock);
648     LOCK (reslock);
649     }
650    
651     static void atfork_parent (void)
652     {
653     UNLOCK (reslock);
654     UNLOCK (reqlock);
655     UNLOCK (wrklock);
656     }
657    
658     static void atfork_child (void)
659     {
660     aio_req prv;
661    
662     while (prv = reqq_shift (&req_queue))
663     req_free (prv);
664    
665     while (prv = reqq_shift (&res_queue))
666     req_free (prv);
667    
668     while (wrk_first.next != &wrk_first)
669     {
670     worker *wrk = wrk_first.next;
671    
672     if (wrk->req)
673     req_free (wrk->req);
674    
675     worker_clear (wrk);
676     worker_free (wrk);
677     }
678    
679     started = 0;
680     idle = 0;
681     nreqs = 0;
682     nready = 0;
683     npending = 0;
684    
685     close (respipe [0]);
686     close (respipe [1]);
687     create_pipe ();
688    
689     atfork_parent ();
690     }
691    
692 root 1.2 #define dREQ(reqtype) \
693 root 1.1 aio_req req; \
694     int req_pri = next_pri; \
695     next_pri = DEFAULT_PRI + PRI_BIAS; \
696     \
697     if (SvOK (callback) && !SvROK (callback)) \
698     croak ("callback must be undef or of reference type"); \
699     \
700 root 1.2 Newz (0, req, 1, aio_cb); \
701 root 1.1 if (!req) \
702     croak ("out of memory during aio_req allocation"); \
703     \
704     req->callback = newSVsv (callback); \
705 root 1.2 req->type = (reqtype); \
706 root 1.1 req->pri = req_pri
707    
708     #define REQ_SEND \
709 root 1.2 req_send (req)
710    
711     #define SvPTR(var, arg, type, class) \
712     if (!SvOK (arg)) \
713     (var) = 0; \
714     else if (sv_derived_from ((arg), # class)) \
715     { \
716     IV tmp = SvIV ((SV*) SvRV (arg)); \
717     (var) = INT2PTR (type, tmp); \
718     } \
719     else \
720 root 1.3 Perl_croak (# var " is not of type " # class)
721    
722     inline void
723     set_dbt (DBT *dbt, SV *sv)
724     {
725     STRLEN len;
726     char *data = SvPVbyte (sv, len);
727    
728     dbt->data = malloc (len);
729     memcpy (dbt->data, data, len);
730     dbt->size = len;
731     }
732 root 1.1
733 root 1.2 MODULE = BDB PACKAGE = BDB
734 root 1.1
735     PROTOTYPES: ENABLE
736    
737     BOOT:
738     {
739 root 1.2 HV *stash = gv_stashpv ("BDB", 1);
740    
741     static const struct {
742     const char *name;
743     IV iv;
744     } *civ, const_iv[] = {
745     #define const_iv(name) { # name, (IV)DB_ ## name },
746     const_iv (RPCCLIENT)
747     const_iv (INIT_CDB)
748     const_iv (INIT_LOCK)
749     const_iv (INIT_LOG)
750     const_iv (INIT_MPOOL)
751     const_iv (INIT_REP)
752     const_iv (INIT_TXN)
753     const_iv (RECOVER)
754     const_iv (INIT_TXN)
755     const_iv (RECOVER_FATAL)
756     const_iv (CREATE)
757     const_iv (USE_ENVIRON)
758     const_iv (USE_ENVIRON_ROOT)
759     const_iv (LOCKDOWN)
760     const_iv (PRIVATE)
761     const_iv (REGISTER)
762     const_iv (SYSTEM_MEM)
763     const_iv (AUTO_COMMIT)
764     const_iv (CDB_ALLDB)
765     const_iv (DIRECT_DB)
766     const_iv (DIRECT_LOG)
767     const_iv (DSYNC_DB)
768     const_iv (DSYNC_LOG)
769     const_iv (LOG_AUTOREMOVE)
770     const_iv (LOG_INMEMORY)
771     const_iv (NOLOCKING)
772     const_iv (MULTIVERSION)
773     const_iv (NOMMAP)
774     const_iv (NOPANIC)
775     const_iv (OVERWRITE)
776     const_iv (PANIC_ENVIRONMENT)
777     const_iv (REGION_INIT)
778     const_iv (TIME_NOTGRANTED)
779     const_iv (TXN_NOSYNC)
780     const_iv (TXN_SNAPSHOT)
781     const_iv (TXN_WRITE_NOSYNC)
782     const_iv (YIELDCPU)
783     const_iv (ENCRYPT_AES)
784     const_iv (XA_CREATE)
785     const_iv (BTREE)
786     const_iv (HASH)
787     const_iv (QUEUE)
788     const_iv (RECNO)
789     const_iv (UNKNOWN)
790     const_iv (EXCL)
791     const_iv (READ_UNCOMMITTED)
792     const_iv (TRUNCATE)
793     const_iv (NOSYNC)
794     const_iv (CHKSUM)
795     const_iv (ENCRYPT)
796     const_iv (TXN_NOT_DURABLE)
797     const_iv (DUP)
798     const_iv (DUPSORT)
799     const_iv (RECNUM)
800     const_iv (RENUMBER)
801     const_iv (REVSPLITOFF)
802     const_iv (INORDER)
803     const_iv (CONSUME)
804     const_iv (CONSUME_WAIT)
805     const_iv (SNAPSHOT)
806     const_iv (JOIN_ITEM)
807     const_iv (RMW)
808    
809     const_iv (NOTFOUND)
810     const_iv (KEYEMPTY)
811     const_iv (LOCK_DEADLOCK)
812     const_iv (LOCK_NOTGRANTED)
813     const_iv (RUNRECOVERY)
814 root 1.3 const_iv (OLD_VERSION)
815     const_iv (REP_HANDLE_DEAD)
816     const_iv (REP_LOCKOUT)
817    
818     const_iv (FREE_SPACE)
819     const_iv (FREELIST_ONLY)
820    
821     const_iv (APPEND)
822     const_iv (NODUPDATA)
823     const_iv (NOOVERWRITE)
824    
825     const_iv (SET_LOCK_TIMEOUT)
826     const_iv (SET_TXN_TIMEOUT)
827 root 1.2 };
828    
829     for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
830     newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
831 root 1.1
832     create_pipe ();
833     pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
834     }
835    
836     void
837     max_poll_reqs (int nreqs)
838     PROTOTYPE: $
839     CODE:
840     max_poll_reqs = nreqs;
841    
842     void
843     max_poll_time (double nseconds)
844     PROTOTYPE: $
845     CODE:
846     max_poll_time = nseconds * AIO_TICKS;
847    
848     void
849     min_parallel (int nthreads)
850     PROTOTYPE: $
851    
852     void
853     max_parallel (int nthreads)
854     PROTOTYPE: $
855    
856     void
857     max_idle (int nthreads)
858     PROTOTYPE: $
859     CODE:
860     set_max_idle (nthreads);
861    
862     int
863     max_outstanding (int maxreqs)
864     PROTOTYPE: $
865     CODE:
866     RETVAL = max_outstanding;
867     max_outstanding = maxreqs;
868     OUTPUT:
869     RETVAL
870    
871     int
872 root 1.3 dbreq_pri (int pri = 0)
873 root 1.1 PROTOTYPE: ;$
874     CODE:
875     RETVAL = next_pri - PRI_BIAS;
876     if (items > 0)
877     {
878     if (pri < PRI_MIN) pri = PRI_MIN;
879     if (pri > PRI_MAX) pri = PRI_MAX;
880     next_pri = pri + PRI_BIAS;
881     }
882     OUTPUT:
883     RETVAL
884    
885     void
886 root 1.3 dbreq_nice (int nice = 0)
887 root 1.1 CODE:
888     nice = next_pri - nice;
889     if (nice < PRI_MIN) nice = PRI_MIN;
890     if (nice > PRI_MAX) nice = PRI_MAX;
891     next_pri = nice + PRI_BIAS;
892    
893     void
894     flush ()
895     PROTOTYPE:
896     CODE:
897     while (nreqs)
898     {
899     poll_wait ();
900     poll_cb ();
901     }
902    
903     int
904 root 1.3 poll ()
905 root 1.1 PROTOTYPE:
906     CODE:
907     poll_wait ();
908     RETVAL = poll_cb ();
909     OUTPUT:
910     RETVAL
911    
912     int
913 root 1.3 poll_fileno ()
914 root 1.1 PROTOTYPE:
915     CODE:
916     RETVAL = respipe [0];
917     OUTPUT:
918     RETVAL
919    
920     int
921 root 1.3 poll_cb (...)
922 root 1.1 PROTOTYPE:
923     CODE:
924     RETVAL = poll_cb ();
925     OUTPUT:
926     RETVAL
927    
928     void
929 root 1.3 poll_wait ()
930 root 1.1 PROTOTYPE:
931     CODE:
932     poll_wait ();
933    
934     int
935 root 1.3 nreqs ()
936 root 1.1 PROTOTYPE:
937     CODE:
938     RETVAL = nreqs;
939     OUTPUT:
940     RETVAL
941    
942     int
943 root 1.3 nready ()
944 root 1.1 PROTOTYPE:
945     CODE:
946     RETVAL = get_nready ();
947     OUTPUT:
948     RETVAL
949    
950     int
951 root 1.3 npending ()
952 root 1.1 PROTOTYPE:
953     CODE:
954     RETVAL = get_npending ();
955     OUTPUT:
956     RETVAL
957    
958     int
959 root 1.3 nthreads ()
960 root 1.1 PROTOTYPE:
961     CODE:
962     if (WORDACCESS_UNSAFE) LOCK (wrklock);
963     RETVAL = started;
964     if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
965     OUTPUT:
966     RETVAL
967    
968 root 1.3 void
969     set_sync_prepare (SV *cb)
970     PROTOTYPE: &
971     CODE:
972     SvREFCNT_dec (prepare_cb);
973     prepare_cb = newSVsv (cb);
974    
975 root 1.2 DB_ENV *
976 root 1.3 db_env_create (U32 env_flags = 0)
977 root 1.2 CODE:
978     {
979 root 1.3 errno = db_env_create (&RETVAL, env_flags);
980     if (errno)
981     croak ("db_env_create: %s", db_strerror (errno));
982 root 1.2 }
983 root 1.3 OUTPUT:
984     RETVAL
985 root 1.2
986     void
987 root 1.3 db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef)
988 root 1.2 CODE:
989     {
990     dREQ (REQ_ENV_OPEN);
991     req->env = env;
992 root 1.3 req->uint1 = open_flags | DB_THREAD;
993 root 1.2 req->int1 = mode;
994 root 1.3 req->buf1 = strdup_ornull (db_home);
995 root 1.2 REQ_SEND;
996     }
997    
998     void
999 root 1.3 db_env_close (DB_ENV *env, U32 flags = 0, SV *callback = &PL_sv_undef)
1000 root 1.2 CODE:
1001     {
1002     dREQ (REQ_ENV_CLOSE);
1003     req->env = env;
1004     req->uint1 = flags;
1005     REQ_SEND;
1006     }
1007    
1008     DB *
1009 root 1.3 db_create (DB_ENV *env = 0, U32 flags = 0)
1010 root 1.2 CODE:
1011     {
1012 root 1.3 errno = db_create (&RETVAL, env, flags);
1013     if (errno)
1014     croak ("db_env_create: %s", db_strerror (errno));
1015 root 1.2 }
1016 root 1.3 OUTPUT:
1017     RETVAL
1018 root 1.2
1019     void
1020 root 1.3 db_open (DB *db, DB_TXN *txnid, octetstring file, octetstring database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef)
1021 root 1.2 CODE:
1022     {
1023     dREQ (REQ_DB_OPEN);
1024     req->db = db;
1025     req->txn = txnid;
1026 root 1.3 req->buf1 = strdup_ornull (file);
1027     req->buf2 = strdup_ornull (database);
1028 root 1.2 req->int1 = type;
1029 root 1.3 req->uint1 = flags | DB_THREAD;
1030 root 1.2 req->int2 = mode;
1031     REQ_SEND;
1032     }
1033    
1034     void
1035 root 1.3 db_close (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1036 root 1.2 CODE:
1037     {
1038     dREQ (REQ_DB_CLOSE);
1039     req->db = db;
1040     req->uint1 = flags;
1041     REQ_SEND;
1042     }
1043    
1044 root 1.3 void
1045     db_compact (DB *db, DB_TXN *txn = 0, SV *start = 0, SV *stop = 0, SV *unused1 = 0, U32 flags = DB_FREE_SPACE, SV *unused2 = 0, SV *callback = &PL_sv_undef)
1046     CODE:
1047     {
1048     dREQ (REQ_DB_COMPACT);
1049     req->db = db;
1050     req->txn = txn;
1051     set_dbt (&req->dbt1, start);
1052     set_dbt (&req->dbt2, stop);
1053     req->uint1 = flags;
1054     REQ_SEND;
1055     }
1056    
1057     void
1058     db_sync (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1059     CODE:
1060     {
1061     dREQ (REQ_DB_SYNC);
1062     req->db = db;
1063     req->uint1 = flags;
1064     REQ_SEND;
1065     }
1066    
1067     void
1068     db_put (DB *db, DB_TXN *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1069     CODE:
1070     {
1071     dREQ (REQ_DB_PUT);
1072     req->db = db;
1073     req->txn = 0;
1074     set_dbt (&req->dbt1, key);
1075     set_dbt (&req->dbt2, data);
1076     req->uint1 = flags;
1077     REQ_SEND;
1078     }
1079    
1080 root 1.2
1081     MODULE = BDB PACKAGE = BDB::Env
1082    
1083     int set_cachesize (DB_ENV *env, U32 gbytes, U32 bytes, int ncache = 0)
1084 root 1.3 CODE:
1085     RETVAL = env->set_cachesize (env, gbytes, bytes, ncache);
1086     OUTPUT:
1087     RETVAL
1088 root 1.2
1089     int set_flags (DB_ENV *env, U32 flags, int onoff)
1090 root 1.3 CODE:
1091     RETVAL = env->set_flags (env, flags, onoff);
1092     OUTPUT:
1093     RETVAL
1094    
1095     int set_encrypt (DB_ENV *env, const char *password, U32 flags = 0)
1096     CODE:
1097     RETVAL = env->set_encrypt (env, password, flags);
1098     OUTPUT:
1099     RETVAL
1100    
1101     int set_timeout (DB_ENV *env, NV timeout, U32 flags)
1102     CODE:
1103     RETVAL = env->set_timeout (env, timeout * 1000000, flags);
1104     OUTPUT:
1105     RETVAL
1106 root 1.2
1107    
1108     MODULE = BDB PACKAGE = BDB::Db
1109    
1110     int set_cachesize (DB *db, U32 gbytes, U32 bytes, int ncache = 0)
1111 root 1.3 CODE:
1112     RETVAL = db->set_cachesize (db, gbytes, bytes, ncache);
1113     OUTPUT:
1114     RETVAL
1115 root 1.2
1116 root 1.3 int set_flags (DB *db, U32 flags);
1117     CODE:
1118     RETVAL = db->set_flags (db, flags);
1119     OUTPUT:
1120     RETVAL
1121 root 1.2
1122     int set_encrypt (DB *db, const char *password, U32 flags)
1123 root 1.3 CODE:
1124     RETVAL = db->set_encrypt (db, password, flags);
1125     OUTPUT:
1126     RETVAL
1127 root 1.2
1128     int set_lorder (DB *db, int lorder)
1129 root 1.3 CODE:
1130     RETVAL = db->set_lorder (db, lorder);
1131     OUTPUT:
1132     RETVAL
1133 root 1.2
1134    
1135     int set_bt_minkey (DB *db, U32 minkey)
1136 root 1.3 CODE:
1137     RETVAL = db->set_bt_minkey (db, minkey);
1138     OUTPUT:
1139     RETVAL
1140 root 1.2
1141     int set_re_delim(DB *db, int delim);
1142 root 1.3 CODE:
1143     RETVAL = db->set_re_delim (db, delim);
1144     OUTPUT:
1145     RETVAL
1146 root 1.2
1147     int set_re_pad (DB *db, int re_pad)
1148 root 1.3 CODE:
1149     RETVAL = db->set_re_pad (db, re_pad);
1150     OUTPUT:
1151     RETVAL
1152 root 1.2
1153     int set_re_source (DB *db, char *source)
1154 root 1.3 CODE:
1155     RETVAL = db->set_re_source (db, source);
1156     OUTPUT:
1157     RETVAL
1158 root 1.2
1159     int set_re_len (DB *db, U32 re_len)
1160 root 1.3 CODE:
1161     RETVAL = db->set_re_len (db, re_len);
1162     OUTPUT:
1163     RETVAL
1164 root 1.2
1165     int set_h_ffactor (DB *db, U32 h_ffactor)
1166 root 1.3 CODE:
1167     RETVAL = db->set_h_ffactor (db, h_ffactor);
1168     OUTPUT:
1169     RETVAL
1170 root 1.2
1171     int set_h_nelem (DB *db, U32 h_nelem)
1172 root 1.3 CODE:
1173     RETVAL = db->set_h_nelem (db, h_nelem);
1174     OUTPUT:
1175     RETVAL
1176 root 1.2
1177     int set_q_extentsize (DB *db, U32 extentsize)
1178 root 1.3 CODE:
1179     RETVAL = db->set_q_extentsize (db, extentsize);
1180     OUTPUT:
1181     RETVAL
1182 root 1.2
1183 root 1.1