ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
Revision: 1.4
Committed: Mon Feb 5 22:31:05 2007 UTC (17 years, 3 months ago) by root
Branch: MAIN
Changes since 1.3: +5 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /* solaris */
2     #define _POSIX_PTHREAD_SEMANTICS 1
3    
4     #if __linux && !defined(_GNU_SOURCE)
5     # define _GNU_SOURCE
6     #endif
7    
8     /* just in case */
9     #define _REENTRANT 1
10    
11     #include <errno.h>
12    
13     #include "EXTERN.h"
14     #include "perl.h"
15     #include "XSUB.h"
16    
17     #include <pthread.h>
18    
19     #include <stddef.h>
20     #include <stdlib.h>
21     #include <errno.h>
22     #include <sys/time.h>
23     #include <sys/types.h>
24     #include <limits.h>
25     #include <unistd.h>
26     #include <fcntl.h>
27 root 1.2
28     #include <db.h>
29 root 1.1
30     /* number of seconds after which idle threads exit */
31     #define IDLE_TIMEOUT 10
32    
33     /* wether word reads are potentially non-atomic.
34     * this is conservatice, likely most arches this runs
35     * on have atomic word read/writes.
36     */
37     #ifndef WORDACCESS_UNSAFE
38     # if __i386 || __x86_64
39     # define WORDACCESS_UNSAFE 0
40     # else
41     # define WORDACCESS_UNSAFE 1
42     # endif
43     #endif
44    
45     typedef SV SV8; /* byte-sv, used for argument-checking */
46 root 1.3 typedef char *octetstring;
47 root 1.1
48 root 1.3 static SV *prepare_cb;
49    
50     static inline char *
51     strdup_ornull (const char *s)
52 root 1.2 {
53 root 1.3 return s ? strdup (s) : 0;
54     }
55    
56     enum {
57 root 1.1 REQ_QUIT,
58 root 1.2 REQ_ENV_OPEN, REQ_ENV_CLOSE,
59 root 1.3 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_PUT,
60 root 1.1 };
61    
62     typedef struct aio_cb
63     {
64 root 1.2 struct aio_cb *volatile next;
65     SV *callback;
66 root 1.3 int type, pri, result;
67 root 1.2
68     DB_ENV *env;
69     DB *db;
70     DB_TXN *txn;
71     DBC *cursor;
72     int int1, int2;
73     U32 uint1, uint2;
74     char *buf1, *buf2;
75 root 1.3
76     DBT dbt1, dbt2, dbt3;
77 root 1.1 } aio_cb;
78    
79     typedef aio_cb *aio_req;
80    
81     enum {
82     PRI_MIN = -4,
83     PRI_MAX = 4,
84    
85     DEFAULT_PRI = 0,
86     PRI_BIAS = -PRI_MIN,
87     NUM_PRI = PRI_MAX + PRI_BIAS + 1,
88     };
89    
90     #define AIO_TICKS ((1000000 + 1023) >> 10)
91    
92     static unsigned int max_poll_time = 0;
93     static unsigned int max_poll_reqs = 0;
94    
95     /* calculcate time difference in ~1/AIO_TICKS of a second */
96     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
97     {
98     return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
99     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
100     }
101    
102     static int next_pri = DEFAULT_PRI + PRI_BIAS;
103    
104     static unsigned int started, idle, wanted;
105    
106     #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
107     # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
108     #else
109     # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
110     #endif
111    
112     #define LOCK(mutex) pthread_mutex_lock (&(mutex))
113     #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
114    
115     /* worker threads management */
116     static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
117    
118     typedef struct worker {
119     /* locked by wrklock */
120     struct worker *prev, *next;
121    
122     pthread_t tid;
123    
124     /* locked by reslock, reqlock or wrklock */
125     aio_req req; /* currently processed request */
126     void *dbuf;
127     DIR *dirp;
128     } worker;
129    
130     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
131    
132     static void worker_clear (worker *wrk)
133     {
134     }
135    
136     static void worker_free (worker *wrk)
137     {
138     wrk->next->prev = wrk->prev;
139     wrk->prev->next = wrk->next;
140    
141     free (wrk);
142     }
143    
144     static volatile unsigned int nreqs, nready, npending;
145     static volatile unsigned int max_idle = 4;
146     static volatile unsigned int max_outstanding = 0xffffffff;
147     static int respipe [2];
148    
149     static pthread_mutex_t reslock = AIO_MUTEX_INIT;
150     static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
151     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
152    
153     #if WORDACCESS_UNSAFE
154    
155     static unsigned int get_nready ()
156     {
157     unsigned int retval;
158    
159     LOCK (reqlock);
160     retval = nready;
161     UNLOCK (reqlock);
162    
163     return retval;
164     }
165    
166     static unsigned int get_npending ()
167     {
168     unsigned int retval;
169    
170     LOCK (reslock);
171     retval = npending;
172     UNLOCK (reslock);
173    
174     return retval;
175     }
176    
177     static unsigned int get_nthreads ()
178     {
179     unsigned int retval;
180    
181     LOCK (wrklock);
182     retval = started;
183     UNLOCK (wrklock);
184    
185     return retval;
186     }
187    
188     #else
189    
190     # define get_nready() nready
191     # define get_npending() npending
192     # define get_nthreads() started
193    
194     #endif
195    
196     /*
197     * a somewhat faster data structure might be nice, but
198     * with 8 priorities this actually needs <20 insns
199     * per shift, the most expensive operation.
200     */
201     typedef struct {
202     aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
203     int size;
204     } reqq;
205    
206     static reqq req_queue;
207     static reqq res_queue;
208    
209     int reqq_push (reqq *q, aio_req req)
210     {
211     int pri = req->pri;
212     req->next = 0;
213    
214     if (q->qe[pri])
215     {
216     q->qe[pri]->next = req;
217     q->qe[pri] = req;
218     }
219     else
220     q->qe[pri] = q->qs[pri] = req;
221    
222     return q->size++;
223     }
224    
225     aio_req reqq_shift (reqq *q)
226     {
227     int pri;
228    
229     if (!q->size)
230     return 0;
231    
232     --q->size;
233    
234     for (pri = NUM_PRI; pri--; )
235     {
236     aio_req req = q->qs[pri];
237    
238     if (req)
239     {
240     if (!(q->qs[pri] = req->next))
241     q->qe[pri] = 0;
242    
243     return req;
244     }
245     }
246    
247     abort ();
248     }
249    
250     static int poll_cb ();
251     static void req_free (aio_req req);
252     static void req_cancel (aio_req req);
253    
254     static int req_invoke (aio_req req)
255     {
256     dSP;
257    
258     if (SvOK (req->callback))
259     {
260     ENTER;
261     SAVETMPS;
262     PUSHMARK (SP);
263     EXTEND (SP, 1);
264    
265     switch (req->type)
266     {
267     }
268    
269 root 1.3 errno = req->result;
270    
271 root 1.1 PUTBACK;
272     call_sv (req->callback, G_VOID | G_EVAL);
273     SPAGAIN;
274    
275     FREETMPS;
276     LEAVE;
277     }
278    
279     return !SvTRUE (ERRSV);
280     }
281    
282     static void req_free (aio_req req)
283     {
284 root 1.2 free (req->buf1);
285     free (req->buf2);
286 root 1.1 Safefree (req);
287     }
288    
289 root 1.3 static void *aio_proc (void *arg);
290 root 1.1
291     static void start_thread (void)
292     {
293     sigset_t fullsigset, oldsigset;
294     pthread_attr_t attr;
295    
296     worker *wrk = calloc (1, sizeof (worker));
297    
298     if (!wrk)
299     croak ("unable to allocate worker thread data");
300    
301     pthread_attr_init (&attr);
302     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
303     #ifdef PTHREAD_SCOPE_PROCESS
304     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
305     #endif
306    
307     sigfillset (&fullsigset);
308    
309     LOCK (wrklock);
310     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
311    
312     if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
313     {
314     wrk->prev = &wrk_first;
315     wrk->next = wrk_first.next;
316     wrk_first.next->prev = wrk;
317     wrk_first.next = wrk;
318     ++started;
319     }
320     else
321     free (wrk);
322    
323     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
324     UNLOCK (wrklock);
325     }
326    
327     static void maybe_start_thread ()
328     {
329     if (get_nthreads () >= wanted)
330     return;
331    
332     /* todo: maybe use idle here, but might be less exact */
333     if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
334     return;
335    
336     start_thread ();
337     }
338    
339     static void req_send (aio_req req)
340     {
341 root 1.3 SV *wait_callback = 0;
342    
343     // synthesize callback if none given
344     if (!SvOK (req->callback))
345     {
346     dSP;
347     PUSHMARK (SP);
348     PUTBACK;
349     int count = call_sv (prepare_cb, G_ARRAY);
350     SPAGAIN;
351    
352     if (count != 2)
353     croak ("prepare callback must return exactly two values\n");
354    
355     wait_callback = SvREFCNT_inc (POPs);
356     SvREFCNT_dec (req->callback);
357     req->callback = SvREFCNT_inc (POPs);
358     }
359    
360 root 1.1 ++nreqs;
361    
362     LOCK (reqlock);
363     ++nready;
364     reqq_push (&req_queue, req);
365     pthread_cond_signal (&reqwait);
366     UNLOCK (reqlock);
367    
368     maybe_start_thread ();
369 root 1.3
370     if (wait_callback)
371     {
372     dSP;
373     PUSHMARK (SP);
374     PUTBACK;
375     call_sv (wait_callback, G_DISCARD);
376     SvREFCNT_dec (wait_callback);
377     }
378 root 1.1 }
379    
380     static void end_thread (void)
381     {
382     aio_req req;
383    
384     Newz (0, req, 1, aio_cb);
385    
386     req->type = REQ_QUIT;
387     req->pri = PRI_MAX + PRI_BIAS;
388    
389     LOCK (reqlock);
390     reqq_push (&req_queue, req);
391     pthread_cond_signal (&reqwait);
392     UNLOCK (reqlock);
393    
394     LOCK (wrklock);
395     --started;
396     UNLOCK (wrklock);
397     }
398    
399     static void set_max_idle (int nthreads)
400     {
401     if (WORDACCESS_UNSAFE) LOCK (reqlock);
402     max_idle = nthreads <= 0 ? 1 : nthreads;
403     if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
404     }
405    
406     static void min_parallel (int nthreads)
407     {
408     if (wanted < nthreads)
409     wanted = nthreads;
410     }
411    
412     static void max_parallel (int nthreads)
413     {
414     if (wanted > nthreads)
415     wanted = nthreads;
416    
417     while (started > wanted)
418     end_thread ();
419     }
420    
421     static void poll_wait ()
422     {
423     fd_set rfd;
424    
425     while (nreqs)
426     {
427     int size;
428     if (WORDACCESS_UNSAFE) LOCK (reslock);
429     size = res_queue.size;
430     if (WORDACCESS_UNSAFE) UNLOCK (reslock);
431    
432     if (size)
433     return;
434    
435     maybe_start_thread ();
436    
437     FD_ZERO(&rfd);
438     FD_SET(respipe [0], &rfd);
439    
440     select (respipe [0] + 1, &rfd, 0, 0, 0);
441     }
442     }
443    
444     static int poll_cb ()
445     {
446     dSP;
447     int count = 0;
448     int maxreqs = max_poll_reqs;
449     int do_croak = 0;
450     struct timeval tv_start, tv_now;
451     aio_req req;
452    
453     if (max_poll_time)
454     gettimeofday (&tv_start, 0);
455    
456     for (;;)
457     {
458     for (;;)
459     {
460     maybe_start_thread ();
461    
462     LOCK (reslock);
463     req = reqq_shift (&res_queue);
464    
465     if (req)
466     {
467     --npending;
468    
469     if (!res_queue.size)
470     {
471     /* read any signals sent by the worker threads */
472     char buf [4];
473     while (read (respipe [0], buf, 4) == 4)
474     ;
475     }
476     }
477    
478     UNLOCK (reslock);
479    
480     if (!req)
481     break;
482    
483     --nreqs;
484    
485     if (!req_invoke (req))
486     {
487     req_free (req);
488     croak (0);
489     }
490    
491     count++;
492    
493     req_free (req);
494    
495     if (maxreqs && !--maxreqs)
496     break;
497    
498     if (max_poll_time)
499     {
500     gettimeofday (&tv_now, 0);
501    
502     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
503     break;
504     }
505     }
506    
507     if (nreqs <= max_outstanding)
508     break;
509    
510     poll_wait ();
511    
512     ++maxreqs;
513     }
514    
515     return count;
516     }
517    
518     static void create_pipe ()
519     {
520     if (pipe (respipe))
521     croak ("unable to initialize result pipe");
522    
523     if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
524     croak ("cannot set result pipe to nonblocking mode");
525    
526     if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
527     croak ("cannot set result pipe to nonblocking mode");
528     }
529    
530     /*****************************************************************************/
531    
532     static void *aio_proc (void *thr_arg)
533     {
534     aio_req req;
535     struct timespec ts;
536     worker *self = (worker *)thr_arg;
537    
538     /* try to distribute timeouts somewhat evenly */
539     ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
540     * (1000000000UL / 1024UL);
541    
542     for (;;)
543     {
544     ts.tv_sec = time (0) + IDLE_TIMEOUT;
545    
546     LOCK (reqlock);
547    
548     for (;;)
549     {
550     self->req = req = reqq_shift (&req_queue);
551    
552     if (req)
553     break;
554    
555     ++idle;
556    
557     if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
558     == ETIMEDOUT)
559     {
560     if (idle > max_idle)
561     {
562     --idle;
563     UNLOCK (reqlock);
564     LOCK (wrklock);
565     --started;
566     UNLOCK (wrklock);
567     goto quit;
568     }
569    
570     /* we are allowed to idle, so do so without any timeout */
571     pthread_cond_wait (&reqwait, &reqlock);
572     ts.tv_sec = time (0) + IDLE_TIMEOUT;
573     }
574    
575     --idle;
576     }
577    
578     --nready;
579    
580     UNLOCK (reqlock);
581    
582     switch (req->type)
583     {
584     case REQ_QUIT:
585     goto quit;
586    
587 root 1.3 case REQ_ENV_OPEN:
588     req->result = req->env->open (req->env, req->buf1, req->uint1, req->int1);
589     break;
590    
591     case REQ_ENV_CLOSE:
592     req->result = req->env->close (req->env, req->uint1);
593     break;
594    
595     case REQ_DB_OPEN:
596     req->result = req->db->open (req->db, req->txn, req->buf1, req->buf2, req->int1, req->uint1, req->int2);
597     break;
598    
599     case REQ_DB_CLOSE:
600     req->result = req->db->close (req->db, req->uint1);
601     break;
602    
603     case REQ_DB_COMPACT:
604     req->result = req->db->compact (req->db, req->txn, &req->dbt1, &req->dbt2, 0, req->uint1, 0);
605     break;
606    
607     case REQ_DB_SYNC:
608     req->result = req->db->sync (req->db, req->uint1);
609     break;
610    
611     case REQ_DB_PUT:
612     req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1);
613     break;
614    
615 root 1.1 default:
616 root 1.3 req->result = ENOSYS;
617 root 1.1 break;
618     }
619    
620     LOCK (reslock);
621    
622     ++npending;
623    
624     if (!reqq_push (&res_queue, req))
625     /* write a dummy byte to the pipe so fh becomes ready */
626     write (respipe [1], &respipe, 1);
627    
628     self->req = 0;
629     worker_clear (self);
630    
631     UNLOCK (reslock);
632     }
633    
634     quit:
635     LOCK (wrklock);
636     worker_free (self);
637     UNLOCK (wrklock);
638    
639     return 0;
640     }
641    
642     /*****************************************************************************/
643    
644     static void atfork_prepare (void)
645     {
646     LOCK (wrklock);
647     LOCK (reqlock);
648     LOCK (reslock);
649     }
650    
651     static void atfork_parent (void)
652     {
653     UNLOCK (reslock);
654     UNLOCK (reqlock);
655     UNLOCK (wrklock);
656     }
657    
658     static void atfork_child (void)
659     {
660     aio_req prv;
661    
662     while (prv = reqq_shift (&req_queue))
663     req_free (prv);
664    
665     while (prv = reqq_shift (&res_queue))
666     req_free (prv);
667    
668     while (wrk_first.next != &wrk_first)
669     {
670     worker *wrk = wrk_first.next;
671    
672     if (wrk->req)
673     req_free (wrk->req);
674    
675     worker_clear (wrk);
676     worker_free (wrk);
677     }
678    
679     started = 0;
680     idle = 0;
681     nreqs = 0;
682     nready = 0;
683     npending = 0;
684    
685     close (respipe [0]);
686     close (respipe [1]);
687     create_pipe ();
688    
689     atfork_parent ();
690     }
691    
692 root 1.2 #define dREQ(reqtype) \
693 root 1.1 aio_req req; \
694     int req_pri = next_pri; \
695     next_pri = DEFAULT_PRI + PRI_BIAS; \
696     \
697     if (SvOK (callback) && !SvROK (callback)) \
698     croak ("callback must be undef or of reference type"); \
699     \
700 root 1.2 Newz (0, req, 1, aio_cb); \
701 root 1.1 if (!req) \
702     croak ("out of memory during aio_req allocation"); \
703     \
704     req->callback = newSVsv (callback); \
705 root 1.2 req->type = (reqtype); \
706 root 1.1 req->pri = req_pri
707    
708     #define REQ_SEND \
709 root 1.2 req_send (req)
710    
711     #define SvPTR(var, arg, type, class) \
712     if (!SvOK (arg)) \
713     (var) = 0; \
714     else if (sv_derived_from ((arg), # class)) \
715     { \
716     IV tmp = SvIV ((SV*) SvRV (arg)); \
717     (var) = INT2PTR (type, tmp); \
718     } \
719     else \
720 root 1.3 Perl_croak (# var " is not of type " # class)
721    
722     inline void
723     set_dbt (DBT *dbt, SV *sv)
724     {
725     STRLEN len;
726     char *data = SvPVbyte (sv, len);
727    
728     dbt->data = malloc (len);
729     memcpy (dbt->data, data, len);
730     dbt->size = len;
731     }
732 root 1.1
733 root 1.2 MODULE = BDB PACKAGE = BDB
734 root 1.1
735     PROTOTYPES: ENABLE
736    
737     BOOT:
738     {
739 root 1.2 HV *stash = gv_stashpv ("BDB", 1);
740    
741     static const struct {
742     const char *name;
743     IV iv;
744     } *civ, const_iv[] = {
745     #define const_iv(name) { # name, (IV)DB_ ## name },
746     const_iv (RPCCLIENT)
747     const_iv (INIT_CDB)
748     const_iv (INIT_LOCK)
749     const_iv (INIT_LOG)
750     const_iv (INIT_MPOOL)
751     const_iv (INIT_REP)
752     const_iv (INIT_TXN)
753     const_iv (RECOVER)
754     const_iv (INIT_TXN)
755     const_iv (RECOVER_FATAL)
756     const_iv (CREATE)
757     const_iv (USE_ENVIRON)
758     const_iv (USE_ENVIRON_ROOT)
759     const_iv (LOCKDOWN)
760     const_iv (PRIVATE)
761     const_iv (REGISTER)
762     const_iv (SYSTEM_MEM)
763     const_iv (AUTO_COMMIT)
764     const_iv (CDB_ALLDB)
765     const_iv (DIRECT_DB)
766     const_iv (DIRECT_LOG)
767     const_iv (DSYNC_DB)
768     const_iv (DSYNC_LOG)
769     const_iv (LOG_AUTOREMOVE)
770     const_iv (LOG_INMEMORY)
771     const_iv (NOLOCKING)
772     const_iv (MULTIVERSION)
773     const_iv (NOMMAP)
774     const_iv (NOPANIC)
775     const_iv (OVERWRITE)
776     const_iv (PANIC_ENVIRONMENT)
777     const_iv (REGION_INIT)
778     const_iv (TIME_NOTGRANTED)
779     const_iv (TXN_NOSYNC)
780     const_iv (TXN_SNAPSHOT)
781     const_iv (TXN_WRITE_NOSYNC)
782     const_iv (YIELDCPU)
783     const_iv (ENCRYPT_AES)
784     const_iv (XA_CREATE)
785     const_iv (BTREE)
786     const_iv (HASH)
787     const_iv (QUEUE)
788     const_iv (RECNO)
789     const_iv (UNKNOWN)
790     const_iv (EXCL)
791 root 1.4 const_iv (READ_COMMITTED)
792 root 1.2 const_iv (READ_UNCOMMITTED)
793     const_iv (TRUNCATE)
794     const_iv (NOSYNC)
795     const_iv (CHKSUM)
796     const_iv (ENCRYPT)
797     const_iv (TXN_NOT_DURABLE)
798     const_iv (DUP)
799     const_iv (DUPSORT)
800     const_iv (RECNUM)
801     const_iv (RENUMBER)
802     const_iv (REVSPLITOFF)
803     const_iv (INORDER)
804     const_iv (CONSUME)
805     const_iv (CONSUME_WAIT)
806     const_iv (SNAPSHOT)
807     const_iv (JOIN_ITEM)
808     const_iv (RMW)
809    
810     const_iv (NOTFOUND)
811     const_iv (KEYEMPTY)
812     const_iv (LOCK_DEADLOCK)
813     const_iv (LOCK_NOTGRANTED)
814     const_iv (RUNRECOVERY)
815 root 1.3 const_iv (OLD_VERSION)
816     const_iv (REP_HANDLE_DEAD)
817     const_iv (REP_LOCKOUT)
818    
819     const_iv (FREE_SPACE)
820     const_iv (FREELIST_ONLY)
821    
822     const_iv (APPEND)
823     const_iv (NODUPDATA)
824     const_iv (NOOVERWRITE)
825    
826 root 1.4 const_iv (TXN_NOWAIT)
827     const_iv (TXN_SNAPSHOT)
828     const_iv (TXN_SYNC)
829    
830 root 1.3 const_iv (SET_LOCK_TIMEOUT)
831     const_iv (SET_TXN_TIMEOUT)
832 root 1.2 };
833    
834     for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
835     newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
836 root 1.1
837     create_pipe ();
838     pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
839     }
840    
841     void
842     max_poll_reqs (int nreqs)
843     PROTOTYPE: $
844     CODE:
845     max_poll_reqs = nreqs;
846    
847     void
848     max_poll_time (double nseconds)
849     PROTOTYPE: $
850     CODE:
851     max_poll_time = nseconds * AIO_TICKS;
852    
853     void
854     min_parallel (int nthreads)
855     PROTOTYPE: $
856    
857     void
858     max_parallel (int nthreads)
859     PROTOTYPE: $
860    
861     void
862     max_idle (int nthreads)
863     PROTOTYPE: $
864     CODE:
865     set_max_idle (nthreads);
866    
867     int
868     max_outstanding (int maxreqs)
869     PROTOTYPE: $
870     CODE:
871     RETVAL = max_outstanding;
872     max_outstanding = maxreqs;
873     OUTPUT:
874     RETVAL
875    
876     int
877 root 1.3 dbreq_pri (int pri = 0)
878 root 1.1 PROTOTYPE: ;$
879     CODE:
880     RETVAL = next_pri - PRI_BIAS;
881     if (items > 0)
882     {
883     if (pri < PRI_MIN) pri = PRI_MIN;
884     if (pri > PRI_MAX) pri = PRI_MAX;
885     next_pri = pri + PRI_BIAS;
886     }
887     OUTPUT:
888     RETVAL
889    
890     void
891 root 1.3 dbreq_nice (int nice = 0)
892 root 1.1 CODE:
893     nice = next_pri - nice;
894     if (nice < PRI_MIN) nice = PRI_MIN;
895     if (nice > PRI_MAX) nice = PRI_MAX;
896     next_pri = nice + PRI_BIAS;
897    
898     void
899     flush ()
900     PROTOTYPE:
901     CODE:
902     while (nreqs)
903     {
904     poll_wait ();
905     poll_cb ();
906     }
907    
908     int
909 root 1.3 poll ()
910 root 1.1 PROTOTYPE:
911     CODE:
912     poll_wait ();
913     RETVAL = poll_cb ();
914     OUTPUT:
915     RETVAL
916    
917     int
918 root 1.3 poll_fileno ()
919 root 1.1 PROTOTYPE:
920     CODE:
921     RETVAL = respipe [0];
922     OUTPUT:
923     RETVAL
924    
925     int
926 root 1.3 poll_cb (...)
927 root 1.1 PROTOTYPE:
928     CODE:
929     RETVAL = poll_cb ();
930     OUTPUT:
931     RETVAL
932    
933     void
934 root 1.3 poll_wait ()
935 root 1.1 PROTOTYPE:
936     CODE:
937     poll_wait ();
938    
939     int
940 root 1.3 nreqs ()
941 root 1.1 PROTOTYPE:
942     CODE:
943     RETVAL = nreqs;
944     OUTPUT:
945     RETVAL
946    
947     int
948 root 1.3 nready ()
949 root 1.1 PROTOTYPE:
950     CODE:
951     RETVAL = get_nready ();
952     OUTPUT:
953     RETVAL
954    
955     int
956 root 1.3 npending ()
957 root 1.1 PROTOTYPE:
958     CODE:
959     RETVAL = get_npending ();
960     OUTPUT:
961     RETVAL
962    
963     int
964 root 1.3 nthreads ()
965 root 1.1 PROTOTYPE:
966     CODE:
967     if (WORDACCESS_UNSAFE) LOCK (wrklock);
968     RETVAL = started;
969     if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
970     OUTPUT:
971     RETVAL
972    
973 root 1.3 void
974     set_sync_prepare (SV *cb)
975     PROTOTYPE: &
976     CODE:
977     SvREFCNT_dec (prepare_cb);
978     prepare_cb = newSVsv (cb);
979    
980 root 1.2 DB_ENV *
981 root 1.3 db_env_create (U32 env_flags = 0)
982 root 1.2 CODE:
983     {
984 root 1.3 errno = db_env_create (&RETVAL, env_flags);
985     if (errno)
986     croak ("db_env_create: %s", db_strerror (errno));
987 root 1.2 }
988 root 1.3 OUTPUT:
989     RETVAL
990 root 1.2
991     void
992 root 1.3 db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef)
993 root 1.2 CODE:
994     {
995     dREQ (REQ_ENV_OPEN);
996     req->env = env;
997 root 1.3 req->uint1 = open_flags | DB_THREAD;
998 root 1.2 req->int1 = mode;
999 root 1.3 req->buf1 = strdup_ornull (db_home);
1000 root 1.2 REQ_SEND;
1001     }
1002    
1003     void
1004 root 1.3 db_env_close (DB_ENV *env, U32 flags = 0, SV *callback = &PL_sv_undef)
1005 root 1.2 CODE:
1006     {
1007     dREQ (REQ_ENV_CLOSE);
1008     req->env = env;
1009     req->uint1 = flags;
1010     REQ_SEND;
1011     }
1012    
1013     DB *
1014 root 1.3 db_create (DB_ENV *env = 0, U32 flags = 0)
1015 root 1.2 CODE:
1016     {
1017 root 1.3 errno = db_create (&RETVAL, env, flags);
1018     if (errno)
1019     croak ("db_env_create: %s", db_strerror (errno));
1020 root 1.2 }
1021 root 1.3 OUTPUT:
1022     RETVAL
1023 root 1.2
1024     void
1025 root 1.3 db_open (DB *db, DB_TXN *txnid, octetstring file, octetstring database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef)
1026 root 1.2 CODE:
1027     {
1028     dREQ (REQ_DB_OPEN);
1029     req->db = db;
1030     req->txn = txnid;
1031 root 1.3 req->buf1 = strdup_ornull (file);
1032     req->buf2 = strdup_ornull (database);
1033 root 1.2 req->int1 = type;
1034 root 1.3 req->uint1 = flags | DB_THREAD;
1035 root 1.2 req->int2 = mode;
1036     REQ_SEND;
1037     }
1038    
1039     void
1040 root 1.3 db_close (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1041 root 1.2 CODE:
1042     {
1043     dREQ (REQ_DB_CLOSE);
1044     req->db = db;
1045     req->uint1 = flags;
1046     REQ_SEND;
1047     }
1048    
1049 root 1.3 void
1050     db_compact (DB *db, DB_TXN *txn = 0, SV *start = 0, SV *stop = 0, SV *unused1 = 0, U32 flags = DB_FREE_SPACE, SV *unused2 = 0, SV *callback = &PL_sv_undef)
1051     CODE:
1052     {
1053     dREQ (REQ_DB_COMPACT);
1054     req->db = db;
1055     req->txn = txn;
1056     set_dbt (&req->dbt1, start);
1057     set_dbt (&req->dbt2, stop);
1058     req->uint1 = flags;
1059     REQ_SEND;
1060     }
1061    
1062     void
1063     db_sync (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1064     CODE:
1065     {
1066     dREQ (REQ_DB_SYNC);
1067     req->db = db;
1068     req->uint1 = flags;
1069     REQ_SEND;
1070     }
1071    
1072     void
1073     db_put (DB *db, DB_TXN *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1074     CODE:
1075     {
1076     dREQ (REQ_DB_PUT);
1077     req->db = db;
1078     req->txn = 0;
1079     set_dbt (&req->dbt1, key);
1080     set_dbt (&req->dbt2, data);
1081     req->uint1 = flags;
1082     REQ_SEND;
1083     }
1084    
1085 root 1.2
1086     MODULE = BDB PACKAGE = BDB::Env
1087    
1088     int set_cachesize (DB_ENV *env, U32 gbytes, U32 bytes, int ncache = 0)
1089 root 1.3 CODE:
1090     RETVAL = env->set_cachesize (env, gbytes, bytes, ncache);
1091     OUTPUT:
1092     RETVAL
1093 root 1.2
1094     int set_flags (DB_ENV *env, U32 flags, int onoff)
1095 root 1.3 CODE:
1096     RETVAL = env->set_flags (env, flags, onoff);
1097     OUTPUT:
1098     RETVAL
1099    
1100     int set_encrypt (DB_ENV *env, const char *password, U32 flags = 0)
1101     CODE:
1102     RETVAL = env->set_encrypt (env, password, flags);
1103     OUTPUT:
1104     RETVAL
1105    
1106     int set_timeout (DB_ENV *env, NV timeout, U32 flags)
1107     CODE:
1108     RETVAL = env->set_timeout (env, timeout * 1000000, flags);
1109     OUTPUT:
1110     RETVAL
1111 root 1.2
1112    
1113     MODULE = BDB PACKAGE = BDB::Db
1114    
1115     int set_cachesize (DB *db, U32 gbytes, U32 bytes, int ncache = 0)
1116 root 1.3 CODE:
1117     RETVAL = db->set_cachesize (db, gbytes, bytes, ncache);
1118     OUTPUT:
1119     RETVAL
1120 root 1.2
1121 root 1.3 int set_flags (DB *db, U32 flags);
1122     CODE:
1123     RETVAL = db->set_flags (db, flags);
1124     OUTPUT:
1125     RETVAL
1126 root 1.2
1127     int set_encrypt (DB *db, const char *password, U32 flags)
1128 root 1.3 CODE:
1129     RETVAL = db->set_encrypt (db, password, flags);
1130     OUTPUT:
1131     RETVAL
1132 root 1.2
1133     int set_lorder (DB *db, int lorder)
1134 root 1.3 CODE:
1135     RETVAL = db->set_lorder (db, lorder);
1136     OUTPUT:
1137     RETVAL
1138 root 1.2
1139    
1140     int set_bt_minkey (DB *db, U32 minkey)
1141 root 1.3 CODE:
1142     RETVAL = db->set_bt_minkey (db, minkey);
1143     OUTPUT:
1144     RETVAL
1145 root 1.2
1146     int set_re_delim(DB *db, int delim);
1147 root 1.3 CODE:
1148     RETVAL = db->set_re_delim (db, delim);
1149     OUTPUT:
1150     RETVAL
1151 root 1.2
1152     int set_re_pad (DB *db, int re_pad)
1153 root 1.3 CODE:
1154     RETVAL = db->set_re_pad (db, re_pad);
1155     OUTPUT:
1156     RETVAL
1157 root 1.2
1158     int set_re_source (DB *db, char *source)
1159 root 1.3 CODE:
1160     RETVAL = db->set_re_source (db, source);
1161     OUTPUT:
1162     RETVAL
1163 root 1.2
1164     int set_re_len (DB *db, U32 re_len)
1165 root 1.3 CODE:
1166     RETVAL = db->set_re_len (db, re_len);
1167     OUTPUT:
1168     RETVAL
1169 root 1.2
1170     int set_h_ffactor (DB *db, U32 h_ffactor)
1171 root 1.3 CODE:
1172     RETVAL = db->set_h_ffactor (db, h_ffactor);
1173     OUTPUT:
1174     RETVAL
1175 root 1.2
1176     int set_h_nelem (DB *db, U32 h_nelem)
1177 root 1.3 CODE:
1178     RETVAL = db->set_h_nelem (db, h_nelem);
1179     OUTPUT:
1180     RETVAL
1181 root 1.2
1182     int set_q_extentsize (DB *db, U32 extentsize)
1183 root 1.3 CODE:
1184     RETVAL = db->set_q_extentsize (db, extentsize);
1185     OUTPUT:
1186     RETVAL
1187 root 1.2
1188 root 1.1