ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.11 by root, Wed May 9 06:42:24 2007 UTC vs.
Revision 1.12 by root, Sun Jul 8 09:16:19 2007 UTC

126static int next_pri = DEFAULT_PRI + PRI_BIAS; 126static int next_pri = DEFAULT_PRI + PRI_BIAS;
127 127
128static unsigned int started, idle, wanted; 128static unsigned int started, idle, wanted;
129 129
130/* worker threads management */ 130/* worker threads management */
131static mutex_t wrklock = MUTEX_INIT; 131static mutex_t wrklock = X_MUTEX_INIT;
132 132
133typedef struct worker { 133typedef struct worker {
134 /* locked by wrklock */ 134 /* locked by wrklock */
135 struct worker *prev, *next; 135 struct worker *prev, *next;
136 136
159static volatile unsigned int nreqs, nready, npending; 159static volatile unsigned int nreqs, nready, npending;
160static volatile unsigned int max_idle = 4; 160static volatile unsigned int max_idle = 4;
161static volatile unsigned int max_outstanding = 0xffffffff; 161static volatile unsigned int max_outstanding = 0xffffffff;
162static int respipe [2]; 162static int respipe [2];
163 163
164static mutex_t reslock = MUTEX_INIT; 164static mutex_t reslock = X_MUTEX_INIT;
165static mutex_t reqlock = MUTEX_INIT; 165static mutex_t reqlock = X_MUTEX_INIT;
166static cond_t reqwait = COND_INIT; 166static cond_t reqwait = X_COND_INIT;
167 167
168#if WORDACCESS_UNSAFE 168#if WORDACCESS_UNSAFE
169 169
170static unsigned int get_nready () 170static unsigned int get_nready ()
171{ 171{
172 unsigned int retval; 172 unsigned int retval;
173 173
174 LOCK (reqlock); 174 X_LOCK (reqlock);
175 retval = nready; 175 retval = nready;
176 UNLOCK (reqlock); 176 X_UNLOCK (reqlock);
177 177
178 return retval; 178 return retval;
179} 179}
180 180
181static unsigned int get_npending () 181static unsigned int get_npending ()
182{ 182{
183 unsigned int retval; 183 unsigned int retval;
184 184
185 LOCK (reslock); 185 X_LOCK (reslock);
186 retval = npending; 186 retval = npending;
187 UNLOCK (reslock); 187 X_UNLOCK (reslock);
188 188
189 return retval; 189 return retval;
190} 190}
191 191
192static unsigned int get_nthreads () 192static unsigned int get_nthreads ()
193{ 193{
194 unsigned int retval; 194 unsigned int retval;
195 195
196 LOCK (wrklock); 196 X_LOCK (wrklock);
197 retval = started; 197 retval = started;
198 UNLOCK (wrklock); 198 X_UNLOCK (wrklock);
199 199
200 return retval; 200 return retval;
201} 201}
202 202
203#else 203#else
347 worker *wrk = calloc (1, sizeof (worker)); 347 worker *wrk = calloc (1, sizeof (worker));
348 348
349 if (!wrk) 349 if (!wrk)
350 croak ("unable to allocate worker thread data"); 350 croak ("unable to allocate worker thread data");
351 351
352 LOCK (wrklock); 352 X_LOCK (wrklock);
353 if (thread_create (&wrk->tid, aio_proc, (void *)wrk)) 353 if (thread_create (&wrk->tid, aio_proc, (void *)wrk))
354 { 354 {
355 wrk->prev = &wrk_first; 355 wrk->prev = &wrk_first;
356 wrk->next = wrk_first.next; 356 wrk->next = wrk_first.next;
357 wrk_first.next->prev = wrk; 357 wrk_first.next->prev = wrk;
359 ++started; 359 ++started;
360 } 360 }
361 else 361 else
362 free (wrk); 362 free (wrk);
363 363
364 UNLOCK (wrklock); 364 X_UNLOCK (wrklock);
365} 365}
366 366
367static void maybe_start_thread () 367static void maybe_start_thread ()
368{ 368{
369 if (get_nthreads () >= wanted) 369 if (get_nthreads () >= wanted)
397 req->callback = SvREFCNT_inc (POPs); 397 req->callback = SvREFCNT_inc (POPs);
398 } 398 }
399 399
400 ++nreqs; 400 ++nreqs;
401 401
402 LOCK (reqlock); 402 X_LOCK (reqlock);
403 ++nready; 403 ++nready;
404 reqq_push (&req_queue, req); 404 reqq_push (&req_queue, req);
405 COND_SIGNAL (reqwait); 405 X_COND_SIGNAL (reqwait);
406 UNLOCK (reqlock); 406 X_UNLOCK (reqlock);
407 407
408 maybe_start_thread (); 408 maybe_start_thread ();
409 409
410 if (wait_callback) 410 if (wait_callback)
411 { 411 {
424 Newz (0, req, 1, aio_cb); 424 Newz (0, req, 1, aio_cb);
425 425
426 req->type = REQ_QUIT; 426 req->type = REQ_QUIT;
427 req->pri = PRI_MAX + PRI_BIAS; 427 req->pri = PRI_MAX + PRI_BIAS;
428 428
429 LOCK (reqlock); 429 X_LOCK (reqlock);
430 reqq_push (&req_queue, req); 430 reqq_push (&req_queue, req);
431 COND_SIGNAL (reqwait); 431 X_COND_SIGNAL (reqwait);
432 UNLOCK (reqlock); 432 X_UNLOCK (reqlock);
433 433
434 LOCK (wrklock); 434 X_LOCK (wrklock);
435 --started; 435 --started;
436 UNLOCK (wrklock); 436 X_UNLOCK (wrklock);
437} 437}
438 438
439static void set_max_idle (int nthreads) 439static void set_max_idle (int nthreads)
440{ 440{
441 if (WORDACCESS_UNSAFE) LOCK (reqlock); 441 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
442 max_idle = nthreads <= 0 ? 1 : nthreads; 442 max_idle = nthreads <= 0 ? 1 : nthreads;
443 if (WORDACCESS_UNSAFE) UNLOCK (reqlock); 443 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
444} 444}
445 445
446static void min_parallel (int nthreads) 446static void min_parallel (int nthreads)
447{ 447{
448 if (wanted < nthreads) 448 if (wanted < nthreads)
463 fd_set rfd; 463 fd_set rfd;
464 464
465 while (nreqs) 465 while (nreqs)
466 { 466 {
467 int size; 467 int size;
468 if (WORDACCESS_UNSAFE) LOCK (reslock); 468 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
469 size = res_queue.size; 469 size = res_queue.size;
470 if (WORDACCESS_UNSAFE) UNLOCK (reslock); 470 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
471 471
472 if (size) 472 if (size)
473 return; 473 return;
474 474
475 maybe_start_thread (); 475 maybe_start_thread ();
497 { 497 {
498 for (;;) 498 for (;;)
499 { 499 {
500 maybe_start_thread (); 500 maybe_start_thread ();
501 501
502 LOCK (reslock); 502 X_LOCK (reslock);
503 req = reqq_shift (&res_queue); 503 req = reqq_shift (&res_queue);
504 504
505 if (req) 505 if (req)
506 { 506 {
507 --npending; 507 --npending;
513 while (read (respipe [0], buf, 4) == 4) 513 while (read (respipe [0], buf, 4) == 4)
514 ; 514 ;
515 } 515 }
516 } 516 }
517 517
518 UNLOCK (reslock); 518 X_UNLOCK (reslock);
519 519
520 if (!req) 520 if (!req)
521 break; 521 break;
522 522
523 --nreqs; 523 --nreqs;
553 } 553 }
554 554
555 return count; 555 return count;
556} 556}
557 557
558static void create_pipe ()
559{
560 if (pipe (respipe))
561 croak ("unable to initialize result pipe");
562
563 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
564 croak ("cannot set result pipe to nonblocking mode");
565
566 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
567 croak ("cannot set result pipe to nonblocking mode");
568}
569
570/*****************************************************************************/ 558/*****************************************************************************/
571 559
572static void *aio_proc (void *thr_arg) 560static void *aio_proc (void *thr_arg)
573{ 561{
574 aio_req req; 562 aio_req req;
581 569
582 for (;;) 570 for (;;)
583 { 571 {
584 ts.tv_sec = time (0) + IDLE_TIMEOUT; 572 ts.tv_sec = time (0) + IDLE_TIMEOUT;
585 573
586 LOCK (reqlock); 574 X_LOCK (reqlock);
587 575
588 for (;;) 576 for (;;)
589 { 577 {
590 self->req = req = reqq_shift (&req_queue); 578 self->req = req = reqq_shift (&req_queue);
591 579
592 if (req) 580 if (req)
593 break; 581 break;
594 582
595 ++idle; 583 ++idle;
596 584
597 if (COND_TIMEDWAIT (reqwait, reqlock, ts) 585 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts)
598 == ETIMEDOUT) 586 == ETIMEDOUT)
599 { 587 {
600 if (idle > max_idle) 588 if (idle > max_idle)
601 { 589 {
602 --idle; 590 --idle;
603 UNLOCK (reqlock); 591 X_UNLOCK (reqlock);
604 LOCK (wrklock); 592 X_LOCK (wrklock);
605 --started; 593 --started;
606 UNLOCK (wrklock); 594 X_UNLOCK (wrklock);
607 goto quit; 595 goto quit;
608 } 596 }
609 597
610 /* we are allowed to idle, so do so without any timeout */ 598 /* we are allowed to idle, so do so without any timeout */
611 COND_WAIT (reqwait, reqlock); 599 X_COND_WAIT (reqwait, reqlock);
612 ts.tv_sec = time (0) + IDLE_TIMEOUT; 600 ts.tv_sec = time (0) + IDLE_TIMEOUT;
613 } 601 }
614 602
615 --idle; 603 --idle;
616 } 604 }
617 605
618 --nready; 606 --nready;
619 607
620 UNLOCK (reqlock); 608 X_UNLOCK (reqlock);
621 609
622 switch (req->type) 610 switch (req->type)
623 { 611 {
624 case REQ_QUIT: 612 case REQ_QUIT:
625 goto quit; 613 goto quit;
739 default: 727 default:
740 req->result = ENOSYS; 728 req->result = ENOSYS;
741 break; 729 break;
742 } 730 }
743 731
744 LOCK (reslock); 732 X_LOCK (reslock);
745 733
746 ++npending; 734 ++npending;
747 735
748 if (!reqq_push (&res_queue, req)) 736 if (!reqq_push (&res_queue, req))
749 /* write a dummy byte to the pipe so fh becomes ready */ 737 /* write a dummy byte to the pipe so fh becomes ready */
750 write (respipe [1], &respipe, 1); 738 write (respipe [1], &respipe, 1);
751 739
752 self->req = 0; 740 self->req = 0;
753 worker_clear (self); 741 worker_clear (self);
754 742
755 UNLOCK (reslock); 743 X_UNLOCK (reslock);
756 } 744 }
757 745
758quit: 746quit:
759 LOCK (wrklock); 747 X_LOCK (wrklock);
760 worker_free (self); 748 worker_free (self);
761 UNLOCK (wrklock); 749 X_UNLOCK (wrklock);
762 750
763 return 0; 751 return 0;
764} 752}
765 753
766/*****************************************************************************/ 754/*****************************************************************************/
767 755
768static void atfork_prepare (void) 756static void atfork_prepare (void)
769{ 757{
770 LOCK (wrklock); 758 X_LOCK (wrklock);
771 LOCK (reqlock); 759 X_LOCK (reqlock);
772 LOCK (reslock); 760 X_LOCK (reslock);
773} 761}
774 762
775static void atfork_parent (void) 763static void atfork_parent (void)
776{ 764{
777 UNLOCK (reslock); 765 X_UNLOCK (reslock);
778 UNLOCK (reqlock); 766 X_UNLOCK (reqlock);
779 UNLOCK (wrklock); 767 X_UNLOCK (wrklock);
780} 768}
781 769
782static void atfork_child (void) 770static void atfork_child (void)
783{ 771{
784 aio_req prv; 772 aio_req prv;
806 nready = 0; 794 nready = 0;
807 npending = 0; 795 npending = 0;
808 796
809 close (respipe [0]); 797 close (respipe [0]);
810 close (respipe [1]); 798 close (respipe [1]);
811 create_pipe (); 799
800 if (!create_pipe (respipe))
801 croak ("unable to initialize result pipe");
812 802
813 atfork_parent (); 803 atfork_parent ();
814} 804}
815 805
816#define dREQ(reqtype) \ 806#define dREQ(reqtype) \
999 }; 989 };
1000 990
1001 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) 991 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
1002 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); 992 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
1003 993
1004 create_pipe (); 994 if (!create_pipe (respipe))
995 croak ("unable to initialize result pipe");
996
1005 ATFORK (atfork_prepare, atfork_parent, atfork_child); 997 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
998#ifdef _WIN32
999 X_MUTEX_CHECK (wrklock);
1000 X_MUTEX_CHECK (reslock);
1001 X_MUTEX_CHECK (reqlock);
1002
1003 X_COND_CHECK (reqwait);
1004#endif
1006} 1005}
1007 1006
1008void 1007void
1009max_poll_reqs (int nreqs) 1008max_poll_reqs (int nreqs)
1010 PROTOTYPE: $ 1009 PROTOTYPE: $
1129 1128
1130int 1129int
1131nthreads () 1130nthreads ()
1132 PROTOTYPE: 1131 PROTOTYPE:
1133 CODE: 1132 CODE:
1134 if (WORDACCESS_UNSAFE) LOCK (wrklock); 1133 if (WORDACCESS_UNSAFE) X_LOCK (wrklock);
1135 RETVAL = started; 1134 RETVAL = started;
1136 if (WORDACCESS_UNSAFE) UNLOCK (wrklock); 1135 if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock);
1137 OUTPUT: 1136 OUTPUT:
1138 RETVAL 1137 RETVAL
1139 1138
1140void 1139void
1141set_sync_prepare (SV *cb) 1140set_sync_prepare (SV *cb)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines