ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.68 by root, Tue Jul 14 19:29:23 2009 UTC vs.
Revision 1.70 by root, Wed Jul 15 14:53:52 2009 UTC

8#include "perl.h" 8#include "perl.h"
9#include "XSUB.h" 9#include "XSUB.h"
10 10
11#include "schmorp.h" 11#include "schmorp.h"
12 12
13// perl stupidly defines these as macros, breaking 13// perl stupidly defines these as argument-less macros, breaking
14// lots and lots of code. 14// lots and lots of code.
15#undef open 15#undef open
16#undef close 16#undef close
17#undef abort 17#undef abort
18#undef malloc 18#undef malloc
247} 247}
248 248
249static volatile unsigned int nreqs, nready, npending; 249static volatile unsigned int nreqs, nready, npending;
250static volatile unsigned int max_idle = 4; 250static volatile unsigned int max_idle = 4;
251static volatile unsigned int max_outstanding = 0xffffffff; 251static volatile unsigned int max_outstanding = 0xffffffff;
252static int respipe_osf [2], respipe [2] = { -1, -1 }; 252static s_epipe respipe;
253 253
254static mutex_t reslock = X_MUTEX_INIT; 254static mutex_t reslock = X_MUTEX_INIT;
255static mutex_t reqlock = X_MUTEX_INIT; 255static mutex_t reqlock = X_MUTEX_INIT;
256static cond_t reqwait = X_COND_INIT; 256static cond_t reqwait = X_COND_INIT;
257 257
461} 461}
462 462
463static void 463static void
464create_respipe (void) 464create_respipe (void)
465{ 465{
466#ifdef _WIN32
467 int arg; /* argg */
468#endif
469 int old_readfd = respipe [0];
470
471 if (respipe [1] >= 0)
472 respipe_close (S_TO_SOCKET (respipe [1]));
473
474 if (s_pipe (respipe)) 466 if (s_epipe_renew (&respipe))
475 croak ("unable to initialize result pipe"); 467 croak ("BDB: unable to create event pipe");
476
477 if (old_readfd >= 0)
478 {
479 if (dup2 (S_TO_SOCKET (respipe [0]), S_TO_SOCKET (old_readfd)) < 0)
480 croak ("unable to initialize result pipe(2)");
481
482 respipe_close (respipe [0]);
483 respipe [0] = old_readfd;
484 }
485
486#ifdef _WIN32
487 arg = 1;
488 if (ioctlsocket (S_TO_SOCKET (respipe [0]), FIONBIO, &arg)
489 || ioctlsocket (S_TO_SOCKET (respipe [1]), FIONBIO, &arg))
490#else
491 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
492 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
493#endif
494 croak ("unable to initialize result pipe(3)");
495
496 respipe_osf [0] = S_TO_SOCKET (respipe [0]);
497 respipe_osf [1] = S_TO_SOCKET (respipe [1]);
498} 468}
499 469
500static void bdb_request (bdb_req req); 470static void bdb_request (bdb_req req);
501X_THREAD_PROC (bdb_proc); 471X_THREAD_PROC (bdb_proc);
502 472
638 end_thread (); 608 end_thread ();
639} 609}
640 610
641static void poll_wait (void) 611static void poll_wait (void)
642{ 612{
643 fd_set rfd;
644
645 while (nreqs) 613 while (nreqs)
646 { 614 {
647 int size; 615 int size;
648 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 616 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
649 size = res_queue.size; 617 size = res_queue.size;
652 if (size) 620 if (size)
653 return; 621 return;
654 622
655 maybe_start_thread (); 623 maybe_start_thread ();
656 624
657 FD_ZERO (&rfd); 625 s_epipe_wait (&respipe);
658 FD_SET (respipe [0], &rfd);
659
660 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
661 } 626 }
662} 627}
663 628
664static int poll_cb (void) 629static int poll_cb (void)
665{ 630{
685 if (req) 650 if (req)
686 { 651 {
687 --npending; 652 --npending;
688 653
689 if (!res_queue.size) 654 if (!res_queue.size)
690 {
691 /* read any signals sent by the worker threads */ 655 /* read any signals sent by the worker threads */
692 char buf [4]; 656 s_epipe_drain (&respipe);
693 while (respipe_read (respipe [0], buf, 4) == 4)
694 ;
695 }
696 } 657 }
697 658
698 X_UNLOCK (reslock); 659 X_UNLOCK (reslock);
699 660
700 if (!req) 661 if (!req)
916 /* try to distribute timeouts somewhat evenly */ 877 /* try to distribute timeouts somewhat evenly */
917 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 878 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
918 879
919 for (;;) 880 for (;;)
920 { 881 {
921 ts.tv_sec = time (0) + IDLE_TIMEOUT; 882 ts.tv_sec = time (0) + IDLE_TIMEOUT;
922 883
923 X_LOCK (reqlock); 884 X_LOCK (reqlock);
924 885
925 for (;;) 886 for (;;)
926 { 887 {
971 X_LOCK (reslock); 932 X_LOCK (reslock);
972 933
973 ++npending; 934 ++npending;
974 935
975 if (!reqq_push (&res_queue, req)) 936 if (!reqq_push (&res_queue, req))
976 /* write a dummy byte to the pipe so fh becomes ready */ 937 s_epipe_signal (&respipe);
977 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
978 938
979 self->req = 0; 939 self->req = 0;
980 worker_clear (self); 940 worker_clear (self);
981 941
982 X_UNLOCK (reslock); 942 X_UNLOCK (reslock);
1517 1477
1518int 1478int
1519poll_fileno () 1479poll_fileno ()
1520 PROTOTYPE: 1480 PROTOTYPE:
1521 CODE: 1481 CODE:
1522 RETVAL = respipe [0]; 1482 RETVAL = s_epipe_fd (&respipe);
1523 OUTPUT: 1483 OUTPUT:
1524 RETVAL 1484 RETVAL
1525 1485
1526int 1486int
1527poll_cb (...) 1487poll_cb (...)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines