ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.8 by root, Sun Feb 11 22:07:23 2007 UTC vs.
Revision 1.12 by root, Sun Jul 8 09:16:19 2007 UTC

1/* solaris */ 1#include "xthread.h"
2#define _POSIX_PTHREAD_SEMANTICS 1
3
4#if __linux && !defined(_GNU_SOURCE)
5# define _GNU_SOURCE
6#endif
7
8/* just in case */
9#define _REENTRANT 1
10 2
11#include <errno.h> 3#include <errno.h>
12 4
13#include "EXTERN.h" 5#include "EXTERN.h"
14#include "perl.h" 6#include "perl.h"
15#include "XSUB.h" 7#include "XSUB.h"
16
17#include <pthread.h>
18 8
19#include <stddef.h> 9#include <stddef.h>
20#include <stdlib.h> 10#include <stdlib.h>
21#include <errno.h> 11#include <errno.h>
22#include <sys/time.h> 12#include <sys/time.h>
25#include <unistd.h> 15#include <unistd.h>
26#include <fcntl.h> 16#include <fcntl.h>
27 17
28#include <db.h> 18#include <db.h>
29 19
20#if DB_VERSION_MAJOR < 4 || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR < 5)
21# error you need Berkeley DB 4.5 or newer installed
22#endif
23
30/* number of seconds after which idle threads exit */ 24/* number of seconds after which idle threads exit */
31#define IDLE_TIMEOUT 10 25#define IDLE_TIMEOUT 10
32
33/* wether word reads are potentially non-atomic.
34 * this is conservatice, likely most arches this runs
35 * on have atomic word read/writes.
36 */
37#ifndef WORDACCESS_UNSAFE
38# if __i386 || __x86_64
39# define WORDACCESS_UNSAFE 0
40# else
41# define WORDACCESS_UNSAFE 1
42# endif
43#endif
44 26
45typedef DB_ENV DB_ENV_ornull; 27typedef DB_ENV DB_ENV_ornull;
46typedef DB_TXN DB_TXN_ornull; 28typedef DB_TXN DB_TXN_ornull;
47typedef DBC DBC_ornull; 29typedef DBC DBC_ornull;
48typedef DB DB_ornull; 30typedef DB DB_ornull;
143 125
144static int next_pri = DEFAULT_PRI + PRI_BIAS; 126static int next_pri = DEFAULT_PRI + PRI_BIAS;
145 127
146static unsigned int started, idle, wanted; 128static unsigned int started, idle, wanted;
147 129
148#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
149# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
150#else
151# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
152#endif
153
154#define LOCK(mutex) pthread_mutex_lock (&(mutex))
155#define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
156
157/* worker threads management */ 130/* worker threads management */
158static pthread_mutex_t wrklock = AIO_MUTEX_INIT; 131static mutex_t wrklock = X_MUTEX_INIT;
159 132
160typedef struct worker { 133typedef struct worker {
161 /* locked by wrklock */ 134 /* locked by wrklock */
162 struct worker *prev, *next; 135 struct worker *prev, *next;
163 136
164 pthread_t tid; 137 thread_t tid;
165 138
166 /* locked by reslock, reqlock or wrklock */ 139 /* locked by reslock, reqlock or wrklock */
167 aio_req req; /* currently processed request */ 140 aio_req req; /* currently processed request */
168 void *dbuf; 141 void *dbuf;
169 DIR *dirp; 142 DIR *dirp;
186static volatile unsigned int nreqs, nready, npending; 159static volatile unsigned int nreqs, nready, npending;
187static volatile unsigned int max_idle = 4; 160static volatile unsigned int max_idle = 4;
188static volatile unsigned int max_outstanding = 0xffffffff; 161static volatile unsigned int max_outstanding = 0xffffffff;
189static int respipe [2]; 162static int respipe [2];
190 163
191static pthread_mutex_t reslock = AIO_MUTEX_INIT; 164static mutex_t reslock = X_MUTEX_INIT;
192static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 165static mutex_t reqlock = X_MUTEX_INIT;
193static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 166static cond_t reqwait = X_COND_INIT;
194 167
195#if WORDACCESS_UNSAFE 168#if WORDACCESS_UNSAFE
196 169
197static unsigned int get_nready () 170static unsigned int get_nready ()
198{ 171{
199 unsigned int retval; 172 unsigned int retval;
200 173
201 LOCK (reqlock); 174 X_LOCK (reqlock);
202 retval = nready; 175 retval = nready;
203 UNLOCK (reqlock); 176 X_UNLOCK (reqlock);
204 177
205 return retval; 178 return retval;
206} 179}
207 180
208static unsigned int get_npending () 181static unsigned int get_npending ()
209{ 182{
210 unsigned int retval; 183 unsigned int retval;
211 184
212 LOCK (reslock); 185 X_LOCK (reslock);
213 retval = npending; 186 retval = npending;
214 UNLOCK (reslock); 187 X_UNLOCK (reslock);
215 188
216 return retval; 189 return retval;
217} 190}
218 191
219static unsigned int get_nthreads () 192static unsigned int get_nthreads ()
220{ 193{
221 unsigned int retval; 194 unsigned int retval;
222 195
223 LOCK (wrklock); 196 X_LOCK (wrklock);
224 retval = started; 197 retval = started;
225 UNLOCK (wrklock); 198 X_UNLOCK (wrklock);
226 199
227 return retval; 200 return retval;
228} 201}
229 202
230#else 203#else
369 342
370static void *aio_proc (void *arg); 343static void *aio_proc (void *arg);
371 344
372static void start_thread (void) 345static void start_thread (void)
373{ 346{
374 sigset_t fullsigset, oldsigset;
375 pthread_attr_t attr;
376
377 worker *wrk = calloc (1, sizeof (worker)); 347 worker *wrk = calloc (1, sizeof (worker));
378 348
379 if (!wrk) 349 if (!wrk)
380 croak ("unable to allocate worker thread data"); 350 croak ("unable to allocate worker thread data");
381 351
382 pthread_attr_init (&attr);
383 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
384#ifdef PTHREAD_SCOPE_PROCESS
385 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
386#endif
387
388 sigfillset (&fullsigset);
389
390 LOCK (wrklock); 352 X_LOCK (wrklock);
391 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
392
393 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) 353 if (thread_create (&wrk->tid, aio_proc, (void *)wrk))
394 { 354 {
395 wrk->prev = &wrk_first; 355 wrk->prev = &wrk_first;
396 wrk->next = wrk_first.next; 356 wrk->next = wrk_first.next;
397 wrk_first.next->prev = wrk; 357 wrk_first.next->prev = wrk;
398 wrk_first.next = wrk; 358 wrk_first.next = wrk;
399 ++started; 359 ++started;
400 } 360 }
401 else 361 else
402 free (wrk); 362 free (wrk);
403 363
404 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
405 UNLOCK (wrklock); 364 X_UNLOCK (wrklock);
406} 365}
407 366
408static void maybe_start_thread () 367static void maybe_start_thread ()
409{ 368{
410 if (get_nthreads () >= wanted) 369 if (get_nthreads () >= wanted)
438 req->callback = SvREFCNT_inc (POPs); 397 req->callback = SvREFCNT_inc (POPs);
439 } 398 }
440 399
441 ++nreqs; 400 ++nreqs;
442 401
443 LOCK (reqlock); 402 X_LOCK (reqlock);
444 ++nready; 403 ++nready;
445 reqq_push (&req_queue, req); 404 reqq_push (&req_queue, req);
446 pthread_cond_signal (&reqwait); 405 X_COND_SIGNAL (reqwait);
447 UNLOCK (reqlock); 406 X_UNLOCK (reqlock);
448 407
449 maybe_start_thread (); 408 maybe_start_thread ();
450 409
451 if (wait_callback) 410 if (wait_callback)
452 { 411 {
465 Newz (0, req, 1, aio_cb); 424 Newz (0, req, 1, aio_cb);
466 425
467 req->type = REQ_QUIT; 426 req->type = REQ_QUIT;
468 req->pri = PRI_MAX + PRI_BIAS; 427 req->pri = PRI_MAX + PRI_BIAS;
469 428
470 LOCK (reqlock); 429 X_LOCK (reqlock);
471 reqq_push (&req_queue, req); 430 reqq_push (&req_queue, req);
472 pthread_cond_signal (&reqwait); 431 X_COND_SIGNAL (reqwait);
473 UNLOCK (reqlock); 432 X_UNLOCK (reqlock);
474 433
475 LOCK (wrklock); 434 X_LOCK (wrklock);
476 --started; 435 --started;
477 UNLOCK (wrklock); 436 X_UNLOCK (wrklock);
478} 437}
479 438
480static void set_max_idle (int nthreads) 439static void set_max_idle (int nthreads)
481{ 440{
482 if (WORDACCESS_UNSAFE) LOCK (reqlock); 441 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
483 max_idle = nthreads <= 0 ? 1 : nthreads; 442 max_idle = nthreads <= 0 ? 1 : nthreads;
484 if (WORDACCESS_UNSAFE) UNLOCK (reqlock); 443 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
485} 444}
486 445
487static void min_parallel (int nthreads) 446static void min_parallel (int nthreads)
488{ 447{
489 if (wanted < nthreads) 448 if (wanted < nthreads)
504 fd_set rfd; 463 fd_set rfd;
505 464
506 while (nreqs) 465 while (nreqs)
507 { 466 {
508 int size; 467 int size;
509 if (WORDACCESS_UNSAFE) LOCK (reslock); 468 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
510 size = res_queue.size; 469 size = res_queue.size;
511 if (WORDACCESS_UNSAFE) UNLOCK (reslock); 470 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
512 471
513 if (size) 472 if (size)
514 return; 473 return;
515 474
516 maybe_start_thread (); 475 maybe_start_thread ();
538 { 497 {
539 for (;;) 498 for (;;)
540 { 499 {
541 maybe_start_thread (); 500 maybe_start_thread ();
542 501
543 LOCK (reslock); 502 X_LOCK (reslock);
544 req = reqq_shift (&res_queue); 503 req = reqq_shift (&res_queue);
545 504
546 if (req) 505 if (req)
547 { 506 {
548 --npending; 507 --npending;
554 while (read (respipe [0], buf, 4) == 4) 513 while (read (respipe [0], buf, 4) == 4)
555 ; 514 ;
556 } 515 }
557 } 516 }
558 517
559 UNLOCK (reslock); 518 X_UNLOCK (reslock);
560 519
561 if (!req) 520 if (!req)
562 break; 521 break;
563 522
564 --nreqs; 523 --nreqs;
594 } 553 }
595 554
596 return count; 555 return count;
597} 556}
598 557
599static void create_pipe ()
600{
601 if (pipe (respipe))
602 croak ("unable to initialize result pipe");
603
604 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
605 croak ("cannot set result pipe to nonblocking mode");
606
607 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
608 croak ("cannot set result pipe to nonblocking mode");
609}
610
611/*****************************************************************************/ 558/*****************************************************************************/
612 559
613static void *aio_proc (void *thr_arg) 560static void *aio_proc (void *thr_arg)
614{ 561{
615 aio_req req; 562 aio_req req;
622 569
623 for (;;) 570 for (;;)
624 { 571 {
625 ts.tv_sec = time (0) + IDLE_TIMEOUT; 572 ts.tv_sec = time (0) + IDLE_TIMEOUT;
626 573
627 LOCK (reqlock); 574 X_LOCK (reqlock);
628 575
629 for (;;) 576 for (;;)
630 { 577 {
631 self->req = req = reqq_shift (&req_queue); 578 self->req = req = reqq_shift (&req_queue);
632 579
633 if (req) 580 if (req)
634 break; 581 break;
635 582
636 ++idle; 583 ++idle;
637 584
638 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) 585 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts)
639 == ETIMEDOUT) 586 == ETIMEDOUT)
640 { 587 {
641 if (idle > max_idle) 588 if (idle > max_idle)
642 { 589 {
643 --idle; 590 --idle;
644 UNLOCK (reqlock); 591 X_UNLOCK (reqlock);
645 LOCK (wrklock); 592 X_LOCK (wrklock);
646 --started; 593 --started;
647 UNLOCK (wrklock); 594 X_UNLOCK (wrklock);
648 goto quit; 595 goto quit;
649 } 596 }
650 597
651 /* we are allowed to idle, so do so without any timeout */ 598 /* we are allowed to idle, so do so without any timeout */
652 pthread_cond_wait (&reqwait, &reqlock); 599 X_COND_WAIT (reqwait, reqlock);
653 ts.tv_sec = time (0) + IDLE_TIMEOUT; 600 ts.tv_sec = time (0) + IDLE_TIMEOUT;
654 } 601 }
655 602
656 --idle; 603 --idle;
657 } 604 }
658 605
659 --nready; 606 --nready;
660 607
661 UNLOCK (reqlock); 608 X_UNLOCK (reqlock);
662 609
663 switch (req->type) 610 switch (req->type)
664 { 611 {
665 case REQ_QUIT: 612 case REQ_QUIT:
666 goto quit; 613 goto quit;
780 default: 727 default:
781 req->result = ENOSYS; 728 req->result = ENOSYS;
782 break; 729 break;
783 } 730 }
784 731
785 LOCK (reslock); 732 X_LOCK (reslock);
786 733
787 ++npending; 734 ++npending;
788 735
789 if (!reqq_push (&res_queue, req)) 736 if (!reqq_push (&res_queue, req))
790 /* write a dummy byte to the pipe so fh becomes ready */ 737 /* write a dummy byte to the pipe so fh becomes ready */
791 write (respipe [1], &respipe, 1); 738 write (respipe [1], &respipe, 1);
792 739
793 self->req = 0; 740 self->req = 0;
794 worker_clear (self); 741 worker_clear (self);
795 742
796 UNLOCK (reslock); 743 X_UNLOCK (reslock);
797 } 744 }
798 745
799quit: 746quit:
800 LOCK (wrklock); 747 X_LOCK (wrklock);
801 worker_free (self); 748 worker_free (self);
802 UNLOCK (wrklock); 749 X_UNLOCK (wrklock);
803 750
804 return 0; 751 return 0;
805} 752}
806 753
807/*****************************************************************************/ 754/*****************************************************************************/
808 755
809static void atfork_prepare (void) 756static void atfork_prepare (void)
810{ 757{
811 LOCK (wrklock); 758 X_LOCK (wrklock);
812 LOCK (reqlock); 759 X_LOCK (reqlock);
813 LOCK (reslock); 760 X_LOCK (reslock);
814} 761}
815 762
816static void atfork_parent (void) 763static void atfork_parent (void)
817{ 764{
818 UNLOCK (reslock); 765 X_UNLOCK (reslock);
819 UNLOCK (reqlock); 766 X_UNLOCK (reqlock);
820 UNLOCK (wrklock); 767 X_UNLOCK (wrklock);
821} 768}
822 769
823static void atfork_child (void) 770static void atfork_child (void)
824{ 771{
825 aio_req prv; 772 aio_req prv;
847 nready = 0; 794 nready = 0;
848 npending = 0; 795 npending = 0;
849 796
850 close (respipe [0]); 797 close (respipe [0]);
851 close (respipe [1]); 798 close (respipe [1]);
852 create_pipe (); 799
800 if (!create_pipe (respipe))
801 croak ("unable to initialize result pipe");
853 802
854 atfork_parent (); 803 atfork_parent ();
855} 804}
856 805
857#define dREQ(reqtype) \ 806#define dREQ(reqtype) \
1040 }; 989 };
1041 990
1042 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) 991 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
1043 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); 992 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
1044 993
1045 create_pipe (); 994 if (!create_pipe (respipe))
995 croak ("unable to initialize result pipe");
996
1046 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 997 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
998#ifdef _WIN32
999 X_MUTEX_CHECK (wrklock);
1000 X_MUTEX_CHECK (reslock);
1001 X_MUTEX_CHECK (reqlock);
1002
1003 X_COND_CHECK (reqwait);
1004#endif
1047} 1005}
1048 1006
1049void 1007void
1050max_poll_reqs (int nreqs) 1008max_poll_reqs (int nreqs)
1051 PROTOTYPE: $ 1009 PROTOTYPE: $
1170 1128
1171int 1129int
1172nthreads () 1130nthreads ()
1173 PROTOTYPE: 1131 PROTOTYPE:
1174 CODE: 1132 CODE:
1175 if (WORDACCESS_UNSAFE) LOCK (wrklock); 1133 if (WORDACCESS_UNSAFE) X_LOCK (wrklock);
1176 RETVAL = started; 1134 RETVAL = started;
1177 if (WORDACCESS_UNSAFE) UNLOCK (wrklock); 1135 if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock);
1178 OUTPUT: 1136 OUTPUT:
1179 RETVAL 1137 RETVAL
1180 1138
1181void 1139void
1182set_sync_prepare (SV *cb) 1140set_sync_prepare (SV *cb)
1633 CODE: 1591 CODE:
1634 RETVAL = env->set_mp_mmapsize (env, ((size_t)mmapsize_mb) << 20); 1592 RETVAL = env->set_mp_mmapsize (env, ((size_t)mmapsize_mb) << 20);
1635 OUTPUT: 1593 OUTPUT:
1636 RETVAL 1594 RETVAL
1637 1595
1638int set_lk_detect (DB_ENV *env, U32 detect) 1596int set_lk_detect (DB_ENV *env, U32 detect = DB_LOCK_DEFAULT)
1639 CODE: 1597 CODE:
1640 RETVAL = env->set_lk_detect (env, detect); 1598 RETVAL = env->set_lk_detect (env, detect);
1641 OUTPUT: 1599 OUTPUT:
1642 RETVAL 1600 RETVAL
1643 1601

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines