ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.10 by root, Mon Mar 5 19:47:01 2007 UTC vs.
Revision 1.12 by root, Sun Jul 8 09:16:19 2007 UTC

1/* solaris */ 1#include "xthread.h"
2#define _POSIX_PTHREAD_SEMANTICS 1
3
4#if __linux && !defined(_GNU_SOURCE)
5# define _GNU_SOURCE
6#endif
7
8/* just in case */
9#define _REENTRANT 1
10 2
11#include <errno.h> 3#include <errno.h>
12 4
13#include "EXTERN.h" 5#include "EXTERN.h"
14#include "perl.h" 6#include "perl.h"
15#include "XSUB.h" 7#include "XSUB.h"
16
17#include <pthread.h>
18 8
19#include <stddef.h> 9#include <stddef.h>
20#include <stdlib.h> 10#include <stdlib.h>
21#include <errno.h> 11#include <errno.h>
22#include <sys/time.h> 12#include <sys/time.h>
31# error you need Berkeley DB 4.5 or newer installed 21# error you need Berkeley DB 4.5 or newer installed
32#endif 22#endif
33 23
34/* number of seconds after which idle threads exit */ 24/* number of seconds after which idle threads exit */
35#define IDLE_TIMEOUT 10 25#define IDLE_TIMEOUT 10
36
37/* wether word reads are potentially non-atomic.
38 * this is conservatice, likely most arches this runs
39 * on have atomic word read/writes.
40 */
41#ifndef WORDACCESS_UNSAFE
42# if __i386 || __x86_64
43# define WORDACCESS_UNSAFE 0
44# else
45# define WORDACCESS_UNSAFE 1
46# endif
47#endif
48 26
49typedef DB_ENV DB_ENV_ornull; 27typedef DB_ENV DB_ENV_ornull;
50typedef DB_TXN DB_TXN_ornull; 28typedef DB_TXN DB_TXN_ornull;
51typedef DBC DBC_ornull; 29typedef DBC DBC_ornull;
52typedef DB DB_ornull; 30typedef DB DB_ornull;
147 125
148static int next_pri = DEFAULT_PRI + PRI_BIAS; 126static int next_pri = DEFAULT_PRI + PRI_BIAS;
149 127
150static unsigned int started, idle, wanted; 128static unsigned int started, idle, wanted;
151 129
152#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
153# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
154#else
155# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
156#endif
157
158#define LOCK(mutex) pthread_mutex_lock (&(mutex))
159#define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
160
161/* worker threads management */ 130/* worker threads management */
162static pthread_mutex_t wrklock = AIO_MUTEX_INIT; 131static mutex_t wrklock = X_MUTEX_INIT;
163 132
164typedef struct worker { 133typedef struct worker {
165 /* locked by wrklock */ 134 /* locked by wrklock */
166 struct worker *prev, *next; 135 struct worker *prev, *next;
167 136
168 pthread_t tid; 137 thread_t tid;
169 138
170 /* locked by reslock, reqlock or wrklock */ 139 /* locked by reslock, reqlock or wrklock */
171 aio_req req; /* currently processed request */ 140 aio_req req; /* currently processed request */
172 void *dbuf; 141 void *dbuf;
173 DIR *dirp; 142 DIR *dirp;
190static volatile unsigned int nreqs, nready, npending; 159static volatile unsigned int nreqs, nready, npending;
191static volatile unsigned int max_idle = 4; 160static volatile unsigned int max_idle = 4;
192static volatile unsigned int max_outstanding = 0xffffffff; 161static volatile unsigned int max_outstanding = 0xffffffff;
193static int respipe [2]; 162static int respipe [2];
194 163
195static pthread_mutex_t reslock = AIO_MUTEX_INIT; 164static mutex_t reslock = X_MUTEX_INIT;
196static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 165static mutex_t reqlock = X_MUTEX_INIT;
197static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 166static cond_t reqwait = X_COND_INIT;
198 167
199#if WORDACCESS_UNSAFE 168#if WORDACCESS_UNSAFE
200 169
201static unsigned int get_nready () 170static unsigned int get_nready ()
202{ 171{
203 unsigned int retval; 172 unsigned int retval;
204 173
205 LOCK (reqlock); 174 X_LOCK (reqlock);
206 retval = nready; 175 retval = nready;
207 UNLOCK (reqlock); 176 X_UNLOCK (reqlock);
208 177
209 return retval; 178 return retval;
210} 179}
211 180
212static unsigned int get_npending () 181static unsigned int get_npending ()
213{ 182{
214 unsigned int retval; 183 unsigned int retval;
215 184
216 LOCK (reslock); 185 X_LOCK (reslock);
217 retval = npending; 186 retval = npending;
218 UNLOCK (reslock); 187 X_UNLOCK (reslock);
219 188
220 return retval; 189 return retval;
221} 190}
222 191
223static unsigned int get_nthreads () 192static unsigned int get_nthreads ()
224{ 193{
225 unsigned int retval; 194 unsigned int retval;
226 195
227 LOCK (wrklock); 196 X_LOCK (wrklock);
228 retval = started; 197 retval = started;
229 UNLOCK (wrklock); 198 X_UNLOCK (wrklock);
230 199
231 return retval; 200 return retval;
232} 201}
233 202
234#else 203#else
373 342
374static void *aio_proc (void *arg); 343static void *aio_proc (void *arg);
375 344
376static void start_thread (void) 345static void start_thread (void)
377{ 346{
378 sigset_t fullsigset, oldsigset;
379 pthread_attr_t attr;
380
381 worker *wrk = calloc (1, sizeof (worker)); 347 worker *wrk = calloc (1, sizeof (worker));
382 348
383 if (!wrk) 349 if (!wrk)
384 croak ("unable to allocate worker thread data"); 350 croak ("unable to allocate worker thread data");
385 351
386 pthread_attr_init (&attr);
387 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
388#ifdef PTHREAD_SCOPE_PROCESS
389 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
390#endif
391
392 sigfillset (&fullsigset);
393
394 LOCK (wrklock); 352 X_LOCK (wrklock);
395 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
396
397 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0) 353 if (thread_create (&wrk->tid, aio_proc, (void *)wrk))
398 { 354 {
399 wrk->prev = &wrk_first; 355 wrk->prev = &wrk_first;
400 wrk->next = wrk_first.next; 356 wrk->next = wrk_first.next;
401 wrk_first.next->prev = wrk; 357 wrk_first.next->prev = wrk;
402 wrk_first.next = wrk; 358 wrk_first.next = wrk;
403 ++started; 359 ++started;
404 } 360 }
405 else 361 else
406 free (wrk); 362 free (wrk);
407 363
408 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
409 UNLOCK (wrklock); 364 X_UNLOCK (wrklock);
410} 365}
411 366
412static void maybe_start_thread () 367static void maybe_start_thread ()
413{ 368{
414 if (get_nthreads () >= wanted) 369 if (get_nthreads () >= wanted)
442 req->callback = SvREFCNT_inc (POPs); 397 req->callback = SvREFCNT_inc (POPs);
443 } 398 }
444 399
445 ++nreqs; 400 ++nreqs;
446 401
447 LOCK (reqlock); 402 X_LOCK (reqlock);
448 ++nready; 403 ++nready;
449 reqq_push (&req_queue, req); 404 reqq_push (&req_queue, req);
450 pthread_cond_signal (&reqwait); 405 X_COND_SIGNAL (reqwait);
451 UNLOCK (reqlock); 406 X_UNLOCK (reqlock);
452 407
453 maybe_start_thread (); 408 maybe_start_thread ();
454 409
455 if (wait_callback) 410 if (wait_callback)
456 { 411 {
469 Newz (0, req, 1, aio_cb); 424 Newz (0, req, 1, aio_cb);
470 425
471 req->type = REQ_QUIT; 426 req->type = REQ_QUIT;
472 req->pri = PRI_MAX + PRI_BIAS; 427 req->pri = PRI_MAX + PRI_BIAS;
473 428
474 LOCK (reqlock); 429 X_LOCK (reqlock);
475 reqq_push (&req_queue, req); 430 reqq_push (&req_queue, req);
476 pthread_cond_signal (&reqwait); 431 X_COND_SIGNAL (reqwait);
477 UNLOCK (reqlock); 432 X_UNLOCK (reqlock);
478 433
479 LOCK (wrklock); 434 X_LOCK (wrklock);
480 --started; 435 --started;
481 UNLOCK (wrklock); 436 X_UNLOCK (wrklock);
482} 437}
483 438
484static void set_max_idle (int nthreads) 439static void set_max_idle (int nthreads)
485{ 440{
486 if (WORDACCESS_UNSAFE) LOCK (reqlock); 441 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
487 max_idle = nthreads <= 0 ? 1 : nthreads; 442 max_idle = nthreads <= 0 ? 1 : nthreads;
488 if (WORDACCESS_UNSAFE) UNLOCK (reqlock); 443 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
489} 444}
490 445
491static void min_parallel (int nthreads) 446static void min_parallel (int nthreads)
492{ 447{
493 if (wanted < nthreads) 448 if (wanted < nthreads)
508 fd_set rfd; 463 fd_set rfd;
509 464
510 while (nreqs) 465 while (nreqs)
511 { 466 {
512 int size; 467 int size;
513 if (WORDACCESS_UNSAFE) LOCK (reslock); 468 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
514 size = res_queue.size; 469 size = res_queue.size;
515 if (WORDACCESS_UNSAFE) UNLOCK (reslock); 470 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
516 471
517 if (size) 472 if (size)
518 return; 473 return;
519 474
520 maybe_start_thread (); 475 maybe_start_thread ();
542 { 497 {
543 for (;;) 498 for (;;)
544 { 499 {
545 maybe_start_thread (); 500 maybe_start_thread ();
546 501
547 LOCK (reslock); 502 X_LOCK (reslock);
548 req = reqq_shift (&res_queue); 503 req = reqq_shift (&res_queue);
549 504
550 if (req) 505 if (req)
551 { 506 {
552 --npending; 507 --npending;
558 while (read (respipe [0], buf, 4) == 4) 513 while (read (respipe [0], buf, 4) == 4)
559 ; 514 ;
560 } 515 }
561 } 516 }
562 517
563 UNLOCK (reslock); 518 X_UNLOCK (reslock);
564 519
565 if (!req) 520 if (!req)
566 break; 521 break;
567 522
568 --nreqs; 523 --nreqs;
598 } 553 }
599 554
600 return count; 555 return count;
601} 556}
602 557
603static void create_pipe ()
604{
605 if (pipe (respipe))
606 croak ("unable to initialize result pipe");
607
608 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
609 croak ("cannot set result pipe to nonblocking mode");
610
611 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
612 croak ("cannot set result pipe to nonblocking mode");
613}
614
615/*****************************************************************************/ 558/*****************************************************************************/
616 559
617static void *aio_proc (void *thr_arg) 560static void *aio_proc (void *thr_arg)
618{ 561{
619 aio_req req; 562 aio_req req;
626 569
627 for (;;) 570 for (;;)
628 { 571 {
629 ts.tv_sec = time (0) + IDLE_TIMEOUT; 572 ts.tv_sec = time (0) + IDLE_TIMEOUT;
630 573
631 LOCK (reqlock); 574 X_LOCK (reqlock);
632 575
633 for (;;) 576 for (;;)
634 { 577 {
635 self->req = req = reqq_shift (&req_queue); 578 self->req = req = reqq_shift (&req_queue);
636 579
637 if (req) 580 if (req)
638 break; 581 break;
639 582
640 ++idle; 583 ++idle;
641 584
642 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) 585 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts)
643 == ETIMEDOUT) 586 == ETIMEDOUT)
644 { 587 {
645 if (idle > max_idle) 588 if (idle > max_idle)
646 { 589 {
647 --idle; 590 --idle;
648 UNLOCK (reqlock); 591 X_UNLOCK (reqlock);
649 LOCK (wrklock); 592 X_LOCK (wrklock);
650 --started; 593 --started;
651 UNLOCK (wrklock); 594 X_UNLOCK (wrklock);
652 goto quit; 595 goto quit;
653 } 596 }
654 597
655 /* we are allowed to idle, so do so without any timeout */ 598 /* we are allowed to idle, so do so without any timeout */
656 pthread_cond_wait (&reqwait, &reqlock); 599 X_COND_WAIT (reqwait, reqlock);
657 ts.tv_sec = time (0) + IDLE_TIMEOUT; 600 ts.tv_sec = time (0) + IDLE_TIMEOUT;
658 } 601 }
659 602
660 --idle; 603 --idle;
661 } 604 }
662 605
663 --nready; 606 --nready;
664 607
665 UNLOCK (reqlock); 608 X_UNLOCK (reqlock);
666 609
667 switch (req->type) 610 switch (req->type)
668 { 611 {
669 case REQ_QUIT: 612 case REQ_QUIT:
670 goto quit; 613 goto quit;
784 default: 727 default:
785 req->result = ENOSYS; 728 req->result = ENOSYS;
786 break; 729 break;
787 } 730 }
788 731
789 LOCK (reslock); 732 X_LOCK (reslock);
790 733
791 ++npending; 734 ++npending;
792 735
793 if (!reqq_push (&res_queue, req)) 736 if (!reqq_push (&res_queue, req))
794 /* write a dummy byte to the pipe so fh becomes ready */ 737 /* write a dummy byte to the pipe so fh becomes ready */
795 write (respipe [1], &respipe, 1); 738 write (respipe [1], &respipe, 1);
796 739
797 self->req = 0; 740 self->req = 0;
798 worker_clear (self); 741 worker_clear (self);
799 742
800 UNLOCK (reslock); 743 X_UNLOCK (reslock);
801 } 744 }
802 745
803quit: 746quit:
804 LOCK (wrklock); 747 X_LOCK (wrklock);
805 worker_free (self); 748 worker_free (self);
806 UNLOCK (wrklock); 749 X_UNLOCK (wrklock);
807 750
808 return 0; 751 return 0;
809} 752}
810 753
811/*****************************************************************************/ 754/*****************************************************************************/
812 755
813static void atfork_prepare (void) 756static void atfork_prepare (void)
814{ 757{
815 LOCK (wrklock); 758 X_LOCK (wrklock);
816 LOCK (reqlock); 759 X_LOCK (reqlock);
817 LOCK (reslock); 760 X_LOCK (reslock);
818} 761}
819 762
820static void atfork_parent (void) 763static void atfork_parent (void)
821{ 764{
822 UNLOCK (reslock); 765 X_UNLOCK (reslock);
823 UNLOCK (reqlock); 766 X_UNLOCK (reqlock);
824 UNLOCK (wrklock); 767 X_UNLOCK (wrklock);
825} 768}
826 769
827static void atfork_child (void) 770static void atfork_child (void)
828{ 771{
829 aio_req prv; 772 aio_req prv;
851 nready = 0; 794 nready = 0;
852 npending = 0; 795 npending = 0;
853 796
854 close (respipe [0]); 797 close (respipe [0]);
855 close (respipe [1]); 798 close (respipe [1]);
856 create_pipe (); 799
800 if (!create_pipe (respipe))
801 croak ("unable to initialize result pipe");
857 802
858 atfork_parent (); 803 atfork_parent ();
859} 804}
860 805
861#define dREQ(reqtype) \ 806#define dREQ(reqtype) \
1044 }; 989 };
1045 990
1046 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) 991 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
1047 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); 992 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
1048 993
1049 create_pipe (); 994 if (!create_pipe (respipe))
995 croak ("unable to initialize result pipe");
996
1050 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 997 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
998#ifdef _WIN32
999 X_MUTEX_CHECK (wrklock);
1000 X_MUTEX_CHECK (reslock);
1001 X_MUTEX_CHECK (reqlock);
1002
1003 X_COND_CHECK (reqwait);
1004#endif
1051} 1005}
1052 1006
1053void 1007void
1054max_poll_reqs (int nreqs) 1008max_poll_reqs (int nreqs)
1055 PROTOTYPE: $ 1009 PROTOTYPE: $
1174 1128
1175int 1129int
1176nthreads () 1130nthreads ()
1177 PROTOTYPE: 1131 PROTOTYPE:
1178 CODE: 1132 CODE:
1179 if (WORDACCESS_UNSAFE) LOCK (wrklock); 1133 if (WORDACCESS_UNSAFE) X_LOCK (wrklock);
1180 RETVAL = started; 1134 RETVAL = started;
1181 if (WORDACCESS_UNSAFE) UNLOCK (wrklock); 1135 if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock);
1182 OUTPUT: 1136 OUTPUT:
1183 RETVAL 1137 RETVAL
1184 1138
1185void 1139void
1186set_sync_prepare (SV *cb) 1140set_sync_prepare (SV *cb)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines