ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
Revision: 1.3
Committed: Mon Feb 5 22:19:07 2007 UTC (17 years, 3 months ago) by root
Branch: MAIN
Changes since 1.2: +241 -39 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 /* solaris */
2 #define _POSIX_PTHREAD_SEMANTICS 1
3
4 #if __linux && !defined(_GNU_SOURCE)
5 # define _GNU_SOURCE
6 #endif
7
8 /* just in case */
9 #define _REENTRANT 1
10
11 #include <errno.h>
12
13 #include "EXTERN.h"
14 #include "perl.h"
15 #include "XSUB.h"
16
17 #include <pthread.h>
18
19 #include <stddef.h>
20 #include <stdlib.h>
21 #include <errno.h>
22 #include <sys/time.h>
23 #include <sys/types.h>
24 #include <limits.h>
25 #include <unistd.h>
26 #include <fcntl.h>
27
28 #include <db.h>
29
30 /* number of seconds after which idle threads exit */
31 #define IDLE_TIMEOUT 10
32
33 /* wether word reads are potentially non-atomic.
34 * this is conservatice, likely most arches this runs
35 * on have atomic word read/writes.
36 */
37 #ifndef WORDACCESS_UNSAFE
38 # if __i386 || __x86_64
39 # define WORDACCESS_UNSAFE 0
40 # else
41 # define WORDACCESS_UNSAFE 1
42 # endif
43 #endif
44
45 typedef SV SV8; /* byte-sv, used for argument-checking */
46 typedef char *octetstring;
47
48 static SV *prepare_cb;
49
50 static inline char *
51 strdup_ornull (const char *s)
52 {
53 return s ? strdup (s) : 0;
54 }
55
56 enum {
57 REQ_QUIT,
58 REQ_ENV_OPEN, REQ_ENV_CLOSE,
59 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_PUT,
60 };
61
62 typedef struct aio_cb
63 {
64 struct aio_cb *volatile next;
65 SV *callback;
66 int type, pri, result;
67
68 DB_ENV *env;
69 DB *db;
70 DB_TXN *txn;
71 DBC *cursor;
72 int int1, int2;
73 U32 uint1, uint2;
74 char *buf1, *buf2;
75
76 DBT dbt1, dbt2, dbt3;
77 } aio_cb;
78
79 typedef aio_cb *aio_req;
80
81 enum {
82 PRI_MIN = -4,
83 PRI_MAX = 4,
84
85 DEFAULT_PRI = 0,
86 PRI_BIAS = -PRI_MIN,
87 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
88 };
89
90 #define AIO_TICKS ((1000000 + 1023) >> 10)
91
92 static unsigned int max_poll_time = 0;
93 static unsigned int max_poll_reqs = 0;
94
95 /* calculcate time difference in ~1/AIO_TICKS of a second */
96 static int tvdiff (struct timeval *tv1, struct timeval *tv2)
97 {
98 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
99 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
100 }
101
102 static int next_pri = DEFAULT_PRI + PRI_BIAS;
103
104 static unsigned int started, idle, wanted;
105
106 #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
107 # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
108 #else
109 # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
110 #endif
111
112 #define LOCK(mutex) pthread_mutex_lock (&(mutex))
113 #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
114
115 /* worker threads management */
116 static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
117
118 typedef struct worker {
119 /* locked by wrklock */
120 struct worker *prev, *next;
121
122 pthread_t tid;
123
124 /* locked by reslock, reqlock or wrklock */
125 aio_req req; /* currently processed request */
126 void *dbuf;
127 DIR *dirp;
128 } worker;
129
130 static worker wrk_first = { &wrk_first, &wrk_first, 0 };
131
132 static void worker_clear (worker *wrk)
133 {
134 }
135
136 static void worker_free (worker *wrk)
137 {
138 wrk->next->prev = wrk->prev;
139 wrk->prev->next = wrk->next;
140
141 free (wrk);
142 }
143
144 static volatile unsigned int nreqs, nready, npending;
145 static volatile unsigned int max_idle = 4;
146 static volatile unsigned int max_outstanding = 0xffffffff;
147 static int respipe [2];
148
149 static pthread_mutex_t reslock = AIO_MUTEX_INIT;
150 static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
151 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
152
153 #if WORDACCESS_UNSAFE
154
155 static unsigned int get_nready ()
156 {
157 unsigned int retval;
158
159 LOCK (reqlock);
160 retval = nready;
161 UNLOCK (reqlock);
162
163 return retval;
164 }
165
166 static unsigned int get_npending ()
167 {
168 unsigned int retval;
169
170 LOCK (reslock);
171 retval = npending;
172 UNLOCK (reslock);
173
174 return retval;
175 }
176
177 static unsigned int get_nthreads ()
178 {
179 unsigned int retval;
180
181 LOCK (wrklock);
182 retval = started;
183 UNLOCK (wrklock);
184
185 return retval;
186 }
187
188 #else
189
190 # define get_nready() nready
191 # define get_npending() npending
192 # define get_nthreads() started
193
194 #endif
195
196 /*
197 * a somewhat faster data structure might be nice, but
198 * with 8 priorities this actually needs <20 insns
199 * per shift, the most expensive operation.
200 */
201 typedef struct {
202 aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
203 int size;
204 } reqq;
205
206 static reqq req_queue;
207 static reqq res_queue;
208
209 int reqq_push (reqq *q, aio_req req)
210 {
211 int pri = req->pri;
212 req->next = 0;
213
214 if (q->qe[pri])
215 {
216 q->qe[pri]->next = req;
217 q->qe[pri] = req;
218 }
219 else
220 q->qe[pri] = q->qs[pri] = req;
221
222 return q->size++;
223 }
224
225 aio_req reqq_shift (reqq *q)
226 {
227 int pri;
228
229 if (!q->size)
230 return 0;
231
232 --q->size;
233
234 for (pri = NUM_PRI; pri--; )
235 {
236 aio_req req = q->qs[pri];
237
238 if (req)
239 {
240 if (!(q->qs[pri] = req->next))
241 q->qe[pri] = 0;
242
243 return req;
244 }
245 }
246
247 abort ();
248 }
249
250 static int poll_cb ();
251 static void req_free (aio_req req);
252 static void req_cancel (aio_req req);
253
254 static int req_invoke (aio_req req)
255 {
256 dSP;
257
258 if (SvOK (req->callback))
259 {
260 ENTER;
261 SAVETMPS;
262 PUSHMARK (SP);
263 EXTEND (SP, 1);
264
265 switch (req->type)
266 {
267 }
268
269 errno = req->result;
270
271 PUTBACK;
272 call_sv (req->callback, G_VOID | G_EVAL);
273 SPAGAIN;
274
275 FREETMPS;
276 LEAVE;
277 }
278
279 return !SvTRUE (ERRSV);
280 }
281
282 static void req_free (aio_req req)
283 {
284 free (req->buf1);
285 free (req->buf2);
286 Safefree (req);
287 }
288
289 static void *aio_proc (void *arg);
290
291 static void start_thread (void)
292 {
293 sigset_t fullsigset, oldsigset;
294 pthread_attr_t attr;
295
296 worker *wrk = calloc (1, sizeof (worker));
297
298 if (!wrk)
299 croak ("unable to allocate worker thread data");
300
301 pthread_attr_init (&attr);
302 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
303 #ifdef PTHREAD_SCOPE_PROCESS
304 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
305 #endif
306
307 sigfillset (&fullsigset);
308
309 LOCK (wrklock);
310 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
311
312 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
313 {
314 wrk->prev = &wrk_first;
315 wrk->next = wrk_first.next;
316 wrk_first.next->prev = wrk;
317 wrk_first.next = wrk;
318 ++started;
319 }
320 else
321 free (wrk);
322
323 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
324 UNLOCK (wrklock);
325 }
326
327 static void maybe_start_thread ()
328 {
329 if (get_nthreads () >= wanted)
330 return;
331
332 /* todo: maybe use idle here, but might be less exact */
333 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
334 return;
335
336 start_thread ();
337 }
338
339 static void req_send (aio_req req)
340 {
341 SV *wait_callback = 0;
342
343 // synthesize callback if none given
344 if (!SvOK (req->callback))
345 {
346 dSP;
347 PUSHMARK (SP);
348 PUTBACK;
349 int count = call_sv (prepare_cb, G_ARRAY);
350 SPAGAIN;
351
352 if (count != 2)
353 croak ("prepare callback must return exactly two values\n");
354
355 wait_callback = SvREFCNT_inc (POPs);
356 SvREFCNT_dec (req->callback);
357 req->callback = SvREFCNT_inc (POPs);
358 }
359
360 ++nreqs;
361
362 LOCK (reqlock);
363 ++nready;
364 reqq_push (&req_queue, req);
365 pthread_cond_signal (&reqwait);
366 UNLOCK (reqlock);
367
368 maybe_start_thread ();
369
370 if (wait_callback)
371 {
372 dSP;
373 PUSHMARK (SP);
374 PUTBACK;
375 call_sv (wait_callback, G_DISCARD);
376 SvREFCNT_dec (wait_callback);
377 }
378 }
379
380 static void end_thread (void)
381 {
382 aio_req req;
383
384 Newz (0, req, 1, aio_cb);
385
386 req->type = REQ_QUIT;
387 req->pri = PRI_MAX + PRI_BIAS;
388
389 LOCK (reqlock);
390 reqq_push (&req_queue, req);
391 pthread_cond_signal (&reqwait);
392 UNLOCK (reqlock);
393
394 LOCK (wrklock);
395 --started;
396 UNLOCK (wrklock);
397 }
398
399 static void set_max_idle (int nthreads)
400 {
401 if (WORDACCESS_UNSAFE) LOCK (reqlock);
402 max_idle = nthreads <= 0 ? 1 : nthreads;
403 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
404 }
405
406 static void min_parallel (int nthreads)
407 {
408 if (wanted < nthreads)
409 wanted = nthreads;
410 }
411
412 static void max_parallel (int nthreads)
413 {
414 if (wanted > nthreads)
415 wanted = nthreads;
416
417 while (started > wanted)
418 end_thread ();
419 }
420
421 static void poll_wait ()
422 {
423 fd_set rfd;
424
425 while (nreqs)
426 {
427 int size;
428 if (WORDACCESS_UNSAFE) LOCK (reslock);
429 size = res_queue.size;
430 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
431
432 if (size)
433 return;
434
435 maybe_start_thread ();
436
437 FD_ZERO(&rfd);
438 FD_SET(respipe [0], &rfd);
439
440 select (respipe [0] + 1, &rfd, 0, 0, 0);
441 }
442 }
443
444 static int poll_cb ()
445 {
446 dSP;
447 int count = 0;
448 int maxreqs = max_poll_reqs;
449 int do_croak = 0;
450 struct timeval tv_start, tv_now;
451 aio_req req;
452
453 if (max_poll_time)
454 gettimeofday (&tv_start, 0);
455
456 for (;;)
457 {
458 for (;;)
459 {
460 maybe_start_thread ();
461
462 LOCK (reslock);
463 req = reqq_shift (&res_queue);
464
465 if (req)
466 {
467 --npending;
468
469 if (!res_queue.size)
470 {
471 /* read any signals sent by the worker threads */
472 char buf [4];
473 while (read (respipe [0], buf, 4) == 4)
474 ;
475 }
476 }
477
478 UNLOCK (reslock);
479
480 if (!req)
481 break;
482
483 --nreqs;
484
485 if (!req_invoke (req))
486 {
487 req_free (req);
488 croak (0);
489 }
490
491 count++;
492
493 req_free (req);
494
495 if (maxreqs && !--maxreqs)
496 break;
497
498 if (max_poll_time)
499 {
500 gettimeofday (&tv_now, 0);
501
502 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
503 break;
504 }
505 }
506
507 if (nreqs <= max_outstanding)
508 break;
509
510 poll_wait ();
511
512 ++maxreqs;
513 }
514
515 return count;
516 }
517
518 static void create_pipe ()
519 {
520 if (pipe (respipe))
521 croak ("unable to initialize result pipe");
522
523 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
524 croak ("cannot set result pipe to nonblocking mode");
525
526 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
527 croak ("cannot set result pipe to nonblocking mode");
528 }
529
530 /*****************************************************************************/
531
532 static void *aio_proc (void *thr_arg)
533 {
534 aio_req req;
535 struct timespec ts;
536 worker *self = (worker *)thr_arg;
537
538 /* try to distribute timeouts somewhat evenly */
539 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
540 * (1000000000UL / 1024UL);
541
542 for (;;)
543 {
544 ts.tv_sec = time (0) + IDLE_TIMEOUT;
545
546 LOCK (reqlock);
547
548 for (;;)
549 {
550 self->req = req = reqq_shift (&req_queue);
551
552 if (req)
553 break;
554
555 ++idle;
556
557 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
558 == ETIMEDOUT)
559 {
560 if (idle > max_idle)
561 {
562 --idle;
563 UNLOCK (reqlock);
564 LOCK (wrklock);
565 --started;
566 UNLOCK (wrklock);
567 goto quit;
568 }
569
570 /* we are allowed to idle, so do so without any timeout */
571 pthread_cond_wait (&reqwait, &reqlock);
572 ts.tv_sec = time (0) + IDLE_TIMEOUT;
573 }
574
575 --idle;
576 }
577
578 --nready;
579
580 UNLOCK (reqlock);
581
582 switch (req->type)
583 {
584 case REQ_QUIT:
585 goto quit;
586
587 case REQ_ENV_OPEN:
588 req->result = req->env->open (req->env, req->buf1, req->uint1, req->int1);
589 break;
590
591 case REQ_ENV_CLOSE:
592 req->result = req->env->close (req->env, req->uint1);
593 break;
594
595 case REQ_DB_OPEN:
596 req->result = req->db->open (req->db, req->txn, req->buf1, req->buf2, req->int1, req->uint1, req->int2);
597 break;
598
599 case REQ_DB_CLOSE:
600 req->result = req->db->close (req->db, req->uint1);
601 break;
602
603 case REQ_DB_COMPACT:
604 req->result = req->db->compact (req->db, req->txn, &req->dbt1, &req->dbt2, 0, req->uint1, 0);
605 break;
606
607 case REQ_DB_SYNC:
608 req->result = req->db->sync (req->db, req->uint1);
609 break;
610
611 case REQ_DB_PUT:
612 req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1);
613 break;
614
615 default:
616 req->result = ENOSYS;
617 break;
618 }
619
620 LOCK (reslock);
621
622 ++npending;
623
624 if (!reqq_push (&res_queue, req))
625 /* write a dummy byte to the pipe so fh becomes ready */
626 write (respipe [1], &respipe, 1);
627
628 self->req = 0;
629 worker_clear (self);
630
631 UNLOCK (reslock);
632 }
633
634 quit:
635 LOCK (wrklock);
636 worker_free (self);
637 UNLOCK (wrklock);
638
639 return 0;
640 }
641
642 /*****************************************************************************/
643
644 static void atfork_prepare (void)
645 {
646 LOCK (wrklock);
647 LOCK (reqlock);
648 LOCK (reslock);
649 }
650
651 static void atfork_parent (void)
652 {
653 UNLOCK (reslock);
654 UNLOCK (reqlock);
655 UNLOCK (wrklock);
656 }
657
658 static void atfork_child (void)
659 {
660 aio_req prv;
661
662 while (prv = reqq_shift (&req_queue))
663 req_free (prv);
664
665 while (prv = reqq_shift (&res_queue))
666 req_free (prv);
667
668 while (wrk_first.next != &wrk_first)
669 {
670 worker *wrk = wrk_first.next;
671
672 if (wrk->req)
673 req_free (wrk->req);
674
675 worker_clear (wrk);
676 worker_free (wrk);
677 }
678
679 started = 0;
680 idle = 0;
681 nreqs = 0;
682 nready = 0;
683 npending = 0;
684
685 close (respipe [0]);
686 close (respipe [1]);
687 create_pipe ();
688
689 atfork_parent ();
690 }
691
692 #define dREQ(reqtype) \
693 aio_req req; \
694 int req_pri = next_pri; \
695 next_pri = DEFAULT_PRI + PRI_BIAS; \
696 \
697 if (SvOK (callback) && !SvROK (callback)) \
698 croak ("callback must be undef or of reference type"); \
699 \
700 Newz (0, req, 1, aio_cb); \
701 if (!req) \
702 croak ("out of memory during aio_req allocation"); \
703 \
704 req->callback = newSVsv (callback); \
705 req->type = (reqtype); \
706 req->pri = req_pri
707
708 #define REQ_SEND \
709 req_send (req)
710
711 #define SvPTR(var, arg, type, class) \
712 if (!SvOK (arg)) \
713 (var) = 0; \
714 else if (sv_derived_from ((arg), # class)) \
715 { \
716 IV tmp = SvIV ((SV*) SvRV (arg)); \
717 (var) = INT2PTR (type, tmp); \
718 } \
719 else \
720 Perl_croak (# var " is not of type " # class)
721
722 inline void
723 set_dbt (DBT *dbt, SV *sv)
724 {
725 STRLEN len;
726 char *data = SvPVbyte (sv, len);
727
728 dbt->data = malloc (len);
729 memcpy (dbt->data, data, len);
730 dbt->size = len;
731 }
732
733 MODULE = BDB PACKAGE = BDB
734
735 PROTOTYPES: ENABLE
736
737 BOOT:
738 {
739 HV *stash = gv_stashpv ("BDB", 1);
740
741 static const struct {
742 const char *name;
743 IV iv;
744 } *civ, const_iv[] = {
745 #define const_iv(name) { # name, (IV)DB_ ## name },
746 const_iv (RPCCLIENT)
747 const_iv (INIT_CDB)
748 const_iv (INIT_LOCK)
749 const_iv (INIT_LOG)
750 const_iv (INIT_MPOOL)
751 const_iv (INIT_REP)
752 const_iv (INIT_TXN)
753 const_iv (RECOVER)
754 const_iv (INIT_TXN)
755 const_iv (RECOVER_FATAL)
756 const_iv (CREATE)
757 const_iv (USE_ENVIRON)
758 const_iv (USE_ENVIRON_ROOT)
759 const_iv (LOCKDOWN)
760 const_iv (PRIVATE)
761 const_iv (REGISTER)
762 const_iv (SYSTEM_MEM)
763 const_iv (AUTO_COMMIT)
764 const_iv (CDB_ALLDB)
765 const_iv (DIRECT_DB)
766 const_iv (DIRECT_LOG)
767 const_iv (DSYNC_DB)
768 const_iv (DSYNC_LOG)
769 const_iv (LOG_AUTOREMOVE)
770 const_iv (LOG_INMEMORY)
771 const_iv (NOLOCKING)
772 const_iv (MULTIVERSION)
773 const_iv (NOMMAP)
774 const_iv (NOPANIC)
775 const_iv (OVERWRITE)
776 const_iv (PANIC_ENVIRONMENT)
777 const_iv (REGION_INIT)
778 const_iv (TIME_NOTGRANTED)
779 const_iv (TXN_NOSYNC)
780 const_iv (TXN_SNAPSHOT)
781 const_iv (TXN_WRITE_NOSYNC)
782 const_iv (YIELDCPU)
783 const_iv (ENCRYPT_AES)
784 const_iv (XA_CREATE)
785 const_iv (BTREE)
786 const_iv (HASH)
787 const_iv (QUEUE)
788 const_iv (RECNO)
789 const_iv (UNKNOWN)
790 const_iv (EXCL)
791 const_iv (READ_UNCOMMITTED)
792 const_iv (TRUNCATE)
793 const_iv (NOSYNC)
794 const_iv (CHKSUM)
795 const_iv (ENCRYPT)
796 const_iv (TXN_NOT_DURABLE)
797 const_iv (DUP)
798 const_iv (DUPSORT)
799 const_iv (RECNUM)
800 const_iv (RENUMBER)
801 const_iv (REVSPLITOFF)
802 const_iv (INORDER)
803 const_iv (CONSUME)
804 const_iv (CONSUME_WAIT)
805 const_iv (SNAPSHOT)
806 const_iv (JOIN_ITEM)
807 const_iv (RMW)
808
809 const_iv (NOTFOUND)
810 const_iv (KEYEMPTY)
811 const_iv (LOCK_DEADLOCK)
812 const_iv (LOCK_NOTGRANTED)
813 const_iv (RUNRECOVERY)
814 const_iv (OLD_VERSION)
815 const_iv (REP_HANDLE_DEAD)
816 const_iv (REP_LOCKOUT)
817
818 const_iv (FREE_SPACE)
819 const_iv (FREELIST_ONLY)
820
821 const_iv (APPEND)
822 const_iv (NODUPDATA)
823 const_iv (NOOVERWRITE)
824
825 const_iv (SET_LOCK_TIMEOUT)
826 const_iv (SET_TXN_TIMEOUT)
827 };
828
829 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
830 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
831
832 create_pipe ();
833 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
834 }
835
836 void
837 max_poll_reqs (int nreqs)
838 PROTOTYPE: $
839 CODE:
840 max_poll_reqs = nreqs;
841
842 void
843 max_poll_time (double nseconds)
844 PROTOTYPE: $
845 CODE:
846 max_poll_time = nseconds * AIO_TICKS;
847
848 void
849 min_parallel (int nthreads)
850 PROTOTYPE: $
851
852 void
853 max_parallel (int nthreads)
854 PROTOTYPE: $
855
856 void
857 max_idle (int nthreads)
858 PROTOTYPE: $
859 CODE:
860 set_max_idle (nthreads);
861
862 int
863 max_outstanding (int maxreqs)
864 PROTOTYPE: $
865 CODE:
866 RETVAL = max_outstanding;
867 max_outstanding = maxreqs;
868 OUTPUT:
869 RETVAL
870
871 int
872 dbreq_pri (int pri = 0)
873 PROTOTYPE: ;$
874 CODE:
875 RETVAL = next_pri - PRI_BIAS;
876 if (items > 0)
877 {
878 if (pri < PRI_MIN) pri = PRI_MIN;
879 if (pri > PRI_MAX) pri = PRI_MAX;
880 next_pri = pri + PRI_BIAS;
881 }
882 OUTPUT:
883 RETVAL
884
885 void
886 dbreq_nice (int nice = 0)
887 CODE:
888 nice = next_pri - nice;
889 if (nice < PRI_MIN) nice = PRI_MIN;
890 if (nice > PRI_MAX) nice = PRI_MAX;
891 next_pri = nice + PRI_BIAS;
892
893 void
894 flush ()
895 PROTOTYPE:
896 CODE:
897 while (nreqs)
898 {
899 poll_wait ();
900 poll_cb ();
901 }
902
903 int
904 poll ()
905 PROTOTYPE:
906 CODE:
907 poll_wait ();
908 RETVAL = poll_cb ();
909 OUTPUT:
910 RETVAL
911
912 int
913 poll_fileno ()
914 PROTOTYPE:
915 CODE:
916 RETVAL = respipe [0];
917 OUTPUT:
918 RETVAL
919
920 int
921 poll_cb (...)
922 PROTOTYPE:
923 CODE:
924 RETVAL = poll_cb ();
925 OUTPUT:
926 RETVAL
927
928 void
929 poll_wait ()
930 PROTOTYPE:
931 CODE:
932 poll_wait ();
933
934 int
935 nreqs ()
936 PROTOTYPE:
937 CODE:
938 RETVAL = nreqs;
939 OUTPUT:
940 RETVAL
941
942 int
943 nready ()
944 PROTOTYPE:
945 CODE:
946 RETVAL = get_nready ();
947 OUTPUT:
948 RETVAL
949
950 int
951 npending ()
952 PROTOTYPE:
953 CODE:
954 RETVAL = get_npending ();
955 OUTPUT:
956 RETVAL
957
958 int
959 nthreads ()
960 PROTOTYPE:
961 CODE:
962 if (WORDACCESS_UNSAFE) LOCK (wrklock);
963 RETVAL = started;
964 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
965 OUTPUT:
966 RETVAL
967
968 void
969 set_sync_prepare (SV *cb)
970 PROTOTYPE: &
971 CODE:
972 SvREFCNT_dec (prepare_cb);
973 prepare_cb = newSVsv (cb);
974
975 DB_ENV *
976 db_env_create (U32 env_flags = 0)
977 CODE:
978 {
979 errno = db_env_create (&RETVAL, env_flags);
980 if (errno)
981 croak ("db_env_create: %s", db_strerror (errno));
982 }
983 OUTPUT:
984 RETVAL
985
986 void
987 db_env_open (DB_ENV *env, octetstring db_home, U32 open_flags, int mode, SV *callback = &PL_sv_undef)
988 CODE:
989 {
990 dREQ (REQ_ENV_OPEN);
991 req->env = env;
992 req->uint1 = open_flags | DB_THREAD;
993 req->int1 = mode;
994 req->buf1 = strdup_ornull (db_home);
995 REQ_SEND;
996 }
997
998 void
999 db_env_close (DB_ENV *env, U32 flags = 0, SV *callback = &PL_sv_undef)
1000 CODE:
1001 {
1002 dREQ (REQ_ENV_CLOSE);
1003 req->env = env;
1004 req->uint1 = flags;
1005 REQ_SEND;
1006 }
1007
1008 DB *
1009 db_create (DB_ENV *env = 0, U32 flags = 0)
1010 CODE:
1011 {
1012 errno = db_create (&RETVAL, env, flags);
1013 if (errno)
1014 croak ("db_env_create: %s", db_strerror (errno));
1015 }
1016 OUTPUT:
1017 RETVAL
1018
1019 void
1020 db_open (DB *db, DB_TXN *txnid, octetstring file, octetstring database, int type, U32 flags, int mode, SV *callback = &PL_sv_undef)
1021 CODE:
1022 {
1023 dREQ (REQ_DB_OPEN);
1024 req->db = db;
1025 req->txn = txnid;
1026 req->buf1 = strdup_ornull (file);
1027 req->buf2 = strdup_ornull (database);
1028 req->int1 = type;
1029 req->uint1 = flags | DB_THREAD;
1030 req->int2 = mode;
1031 REQ_SEND;
1032 }
1033
1034 void
1035 db_close (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1036 CODE:
1037 {
1038 dREQ (REQ_DB_CLOSE);
1039 req->db = db;
1040 req->uint1 = flags;
1041 REQ_SEND;
1042 }
1043
1044 void
1045 db_compact (DB *db, DB_TXN *txn = 0, SV *start = 0, SV *stop = 0, SV *unused1 = 0, U32 flags = DB_FREE_SPACE, SV *unused2 = 0, SV *callback = &PL_sv_undef)
1046 CODE:
1047 {
1048 dREQ (REQ_DB_COMPACT);
1049 req->db = db;
1050 req->txn = txn;
1051 set_dbt (&req->dbt1, start);
1052 set_dbt (&req->dbt2, stop);
1053 req->uint1 = flags;
1054 REQ_SEND;
1055 }
1056
1057 void
1058 db_sync (DB *db, U32 flags = 0, SV *callback = &PL_sv_undef)
1059 CODE:
1060 {
1061 dREQ (REQ_DB_SYNC);
1062 req->db = db;
1063 req->uint1 = flags;
1064 REQ_SEND;
1065 }
1066
1067 void
1068 db_put (DB *db, DB_TXN *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1069 CODE:
1070 {
1071 dREQ (REQ_DB_PUT);
1072 req->db = db;
1073 req->txn = 0;
1074 set_dbt (&req->dbt1, key);
1075 set_dbt (&req->dbt2, data);
1076 req->uint1 = flags;
1077 REQ_SEND;
1078 }
1079
1080
1081 MODULE = BDB PACKAGE = BDB::Env
1082
1083 int set_cachesize (DB_ENV *env, U32 gbytes, U32 bytes, int ncache = 0)
1084 CODE:
1085 RETVAL = env->set_cachesize (env, gbytes, bytes, ncache);
1086 OUTPUT:
1087 RETVAL
1088
1089 int set_flags (DB_ENV *env, U32 flags, int onoff)
1090 CODE:
1091 RETVAL = env->set_flags (env, flags, onoff);
1092 OUTPUT:
1093 RETVAL
1094
1095 int set_encrypt (DB_ENV *env, const char *password, U32 flags = 0)
1096 CODE:
1097 RETVAL = env->set_encrypt (env, password, flags);
1098 OUTPUT:
1099 RETVAL
1100
1101 int set_timeout (DB_ENV *env, NV timeout, U32 flags)
1102 CODE:
1103 RETVAL = env->set_timeout (env, timeout * 1000000, flags);
1104 OUTPUT:
1105 RETVAL
1106
1107
1108 MODULE = BDB PACKAGE = BDB::Db
1109
1110 int set_cachesize (DB *db, U32 gbytes, U32 bytes, int ncache = 0)
1111 CODE:
1112 RETVAL = db->set_cachesize (db, gbytes, bytes, ncache);
1113 OUTPUT:
1114 RETVAL
1115
1116 int set_flags (DB *db, U32 flags);
1117 CODE:
1118 RETVAL = db->set_flags (db, flags);
1119 OUTPUT:
1120 RETVAL
1121
1122 int set_encrypt (DB *db, const char *password, U32 flags)
1123 CODE:
1124 RETVAL = db->set_encrypt (db, password, flags);
1125 OUTPUT:
1126 RETVAL
1127
1128 int set_lorder (DB *db, int lorder)
1129 CODE:
1130 RETVAL = db->set_lorder (db, lorder);
1131 OUTPUT:
1132 RETVAL
1133
1134
1135 int set_bt_minkey (DB *db, U32 minkey)
1136 CODE:
1137 RETVAL = db->set_bt_minkey (db, minkey);
1138 OUTPUT:
1139 RETVAL
1140
1141 int set_re_delim(DB *db, int delim);
1142 CODE:
1143 RETVAL = db->set_re_delim (db, delim);
1144 OUTPUT:
1145 RETVAL
1146
1147 int set_re_pad (DB *db, int re_pad)
1148 CODE:
1149 RETVAL = db->set_re_pad (db, re_pad);
1150 OUTPUT:
1151 RETVAL
1152
1153 int set_re_source (DB *db, char *source)
1154 CODE:
1155 RETVAL = db->set_re_source (db, source);
1156 OUTPUT:
1157 RETVAL
1158
1159 int set_re_len (DB *db, U32 re_len)
1160 CODE:
1161 RETVAL = db->set_re_len (db, re_len);
1162 OUTPUT:
1163 RETVAL
1164
1165 int set_h_ffactor (DB *db, U32 h_ffactor)
1166 CODE:
1167 RETVAL = db->set_h_ffactor (db, h_ffactor);
1168 OUTPUT:
1169 RETVAL
1170
1171 int set_h_nelem (DB *db, U32 h_nelem)
1172 CODE:
1173 RETVAL = db->set_h_nelem (db, h_nelem);
1174 OUTPUT:
1175 RETVAL
1176
1177 int set_q_extentsize (DB *db, U32 extentsize)
1178 CODE:
1179 RETVAL = db->set_q_extentsize (db, extentsize);
1180 OUTPUT:
1181 RETVAL
1182
1183