ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.66 by root, Sun Jan 4 10:48:15 2009 UTC vs.
Revision 1.70 by root, Wed Jul 15 14:53:52 2009 UTC

6 6
7#include "EXTERN.h" 7#include "EXTERN.h"
8#include "perl.h" 8#include "perl.h"
9#include "XSUB.h" 9#include "XSUB.h"
10 10
11#include "schmorp.h"
12
11// perl stupidly defines these as macros, breaking 13// perl stupidly defines these as argument-less macros, breaking
12// lots and lots of code. 14// lots and lots of code.
13#undef open 15#undef open
14#undef close 16#undef close
15#undef abort 17#undef abort
16#undef malloc 18#undef malloc
150enum { 152enum {
151 REQ_QUIT, 153 REQ_QUIT,
152 REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT, 154 REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT,
153 REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, REQ_ENV_DBREMOVE, REQ_ENV_DBRENAME, 155 REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, REQ_ENV_DBREMOVE, REQ_ENV_DBRENAME,
154 REQ_ENV_LOG_ARCHIVE, 156 REQ_ENV_LOG_ARCHIVE,
155 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_UPGRADE, 157 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_VERIFY, REQ_DB_UPGRADE,
156 REQ_DB_PUT, REQ_DB_EXISTS, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, 158 REQ_DB_PUT, REQ_DB_EXISTS, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE,
157 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, 159 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH,
158 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, 160 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL,
159 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, 161 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE,
160}; 162};
245} 247}
246 248
247static volatile unsigned int nreqs, nready, npending; 249static volatile unsigned int nreqs, nready, npending;
248static volatile unsigned int max_idle = 4; 250static volatile unsigned int max_idle = 4;
249static volatile unsigned int max_outstanding = 0xffffffff; 251static volatile unsigned int max_outstanding = 0xffffffff;
250static int respipe_osf [2], respipe [2] = { -1, -1 }; 252static s_epipe respipe;
251 253
252static mutex_t reslock = X_MUTEX_INIT; 254static mutex_t reslock = X_MUTEX_INIT;
253static mutex_t reqlock = X_MUTEX_INIT; 255static mutex_t reqlock = X_MUTEX_INIT;
254static cond_t reqwait = X_COND_INIT; 256static cond_t reqwait = X_COND_INIT;
255 257
456 free (req->buf3); 458 free (req->buf3);
457 459
458 Safefree (req); 460 Safefree (req);
459} 461}
460 462
461#ifdef USE_SOCKETS_AS_HANDLES
462# define TO_SOCKET(x) (win32_get_osfhandle (x))
463#else
464# define EV_SELECT_IS_WINSOCKET 1
465# define TO_SOCKET(x) (x)
466#endif
467
468#ifdef _WIN32
469/* taken verbatim from libev's ev_win32.c */
470/* oh, the humanity! */
471static int
472ev_pipe (int filedes [2])
473{
474 struct sockaddr_in addr = { 0 };
475 int addr_size = sizeof (addr);
476 struct sockaddr_in adr2;
477 int adr2_size;
478 SOCKET listener;
479 SOCKET sock [2] = { -1, -1 };
480
481 if ((listener = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
482 return -1;
483
484 addr.sin_family = AF_INET;
485 addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
486 addr.sin_port = 0;
487
488 if (bind (listener, (struct sockaddr *)&addr, addr_size))
489 goto fail;
490
491 if (getsockname (listener, (struct sockaddr *)&addr, &addr_size))
492 goto fail;
493
494 if (listen (listener, 1))
495 goto fail;
496
497 if ((sock [0] = socket (AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
498 goto fail;
499
500 if (connect (sock [0], (struct sockaddr *)&addr, addr_size))
501 goto fail;
502
503 if ((sock [1] = accept (listener, 0, 0)) < 0)
504 goto fail;
505
506 /* windows vista returns fantasy port numbers for getpeername.
507 * example for two interconnected tcp sockets:
508 *
509 * (Socket::unpack_sockaddr_in getsockname $sock0)[0] == 53364
510 * (Socket::unpack_sockaddr_in getpeername $sock0)[0] == 53363
511 * (Socket::unpack_sockaddr_in getsockname $sock1)[0] == 53363
512 * (Socket::unpack_sockaddr_in getpeername $sock1)[0] == 53365
513 *
514 * wow! tridirectional sockets!
515 *
516 * this way of checking ports seems to work:
517 */
518 if (getpeername (sock [0], (struct sockaddr *)&addr, &addr_size))
519 goto fail;
520
521 if (getsockname (sock [1], (struct sockaddr *)&adr2, &adr2_size))
522 goto fail;
523
524 errno = WSAEINVAL;
525 if (addr_size != adr2_size
526 || addr.sin_addr.s_addr != adr2.sin_addr.s_addr /* just to be sure, I mean, it's windows */
527 || addr.sin_port != adr2.sin_port)
528 goto fail;
529
530 closesocket (listener);
531
532#if EV_SELECT_IS_WINSOCKET
533 filedes [0] = _open_osfhandle (sock [0], 0);
534 filedes [1] = _open_osfhandle (sock [1], 0);
535#else
536 /* when select isn't winsocket, we also expect socket, connect, accept etc.
537 * to work on fds */
538 filedes [0] = sock [0];
539 filedes [1] = sock [1];
540#endif
541
542 return 0;
543
544fail:
545 closesocket (listener);
546
547 if (sock [0] != INVALID_SOCKET) closesocket (sock [0]);
548 if (sock [1] != INVALID_SOCKET) closesocket (sock [1]);
549
550 return -1;
551}
552
553#define pipe(filedes) ev_pipe(filedes)
554#endif
555
556static void 463static void
557create_respipe (void) 464create_respipe (void)
558{ 465{
559#ifdef _WIN32
560 int arg; /* argg */
561#endif
562 int old_readfd = respipe [0];
563
564 if (respipe [1] >= 0)
565 respipe_close (TO_SOCKET (respipe [1]));
566
567 if (pipe (respipe)) 466 if (s_epipe_renew (&respipe))
568 croak ("unable to initialize result pipe"); 467 croak ("BDB: unable to create event pipe");
569
570 if (old_readfd >= 0)
571 {
572 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
573 croak ("unable to initialize result pipe(2)");
574
575 respipe_close (respipe [0]);
576 respipe [0] = old_readfd;
577 }
578
579#ifdef _WIN32
580 arg = 1;
581 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
582 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
583#else
584 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
585 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
586#endif
587 croak ("unable to initialize result pipe(3)");
588
589 respipe_osf [0] = TO_SOCKET (respipe [0]);
590 respipe_osf [1] = TO_SOCKET (respipe [1]);
591} 468}
592 469
593static void bdb_request (bdb_req req); 470static void bdb_request (bdb_req req);
594X_THREAD_PROC (bdb_proc); 471X_THREAD_PROC (bdb_proc);
595 472
731 end_thread (); 608 end_thread ();
732} 609}
733 610
734static void poll_wait (void) 611static void poll_wait (void)
735{ 612{
736 fd_set rfd;
737
738 while (nreqs) 613 while (nreqs)
739 { 614 {
740 int size; 615 int size;
741 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 616 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
742 size = res_queue.size; 617 size = res_queue.size;
745 if (size) 620 if (size)
746 return; 621 return;
747 622
748 maybe_start_thread (); 623 maybe_start_thread ();
749 624
750 FD_ZERO (&rfd); 625 s_epipe_wait (&respipe);
751 FD_SET (respipe [0], &rfd);
752
753 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
754 } 626 }
755} 627}
756 628
757static int poll_cb (void) 629static int poll_cb (void)
758{ 630{
778 if (req) 650 if (req)
779 { 651 {
780 --npending; 652 --npending;
781 653
782 if (!res_queue.size) 654 if (!res_queue.size)
783 {
784 /* read any signals sent by the worker threads */ 655 /* read any signals sent by the worker threads */
785 char buf [4]; 656 s_epipe_drain (&respipe);
786 while (respipe_read (respipe [0], buf, 4) == 4)
787 ;
788 }
789 } 657 }
790 658
791 X_UNLOCK (reslock); 659 X_UNLOCK (reslock);
792 660
793 if (!req) 661 if (!req)
881 break; 749 break;
882#endif 750#endif
883 751
884 case REQ_DB_SYNC: 752 case REQ_DB_SYNC:
885 req->result = req->db->sync (req->db, req->uint1); 753 req->result = req->db->sync (req->db, req->uint1);
754 break;
755
756 case REQ_DB_VERIFY:
757 req->result = req->db->verify (req->db, req->buf1, req->buf2, 0, req->uint1);
886 break; 758 break;
887 759
888 case REQ_DB_UPGRADE: 760 case REQ_DB_UPGRADE:
889 req->result = req->db->upgrade (req->db, req->buf1, req->uint1); 761 req->result = req->db->upgrade (req->db, req->buf1, req->uint1);
890 break; 762 break;
1005 /* try to distribute timeouts somewhat evenly */ 877 /* try to distribute timeouts somewhat evenly */
1006 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 878 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
1007 879
1008 for (;;) 880 for (;;)
1009 { 881 {
1010 ts.tv_sec = time (0) + IDLE_TIMEOUT; 882 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1011 883
1012 X_LOCK (reqlock); 884 X_LOCK (reqlock);
1013 885
1014 for (;;) 886 for (;;)
1015 { 887 {
1060 X_LOCK (reslock); 932 X_LOCK (reslock);
1061 933
1062 ++npending; 934 ++npending;
1063 935
1064 if (!reqq_push (&res_queue, req)) 936 if (!reqq_push (&res_queue, req))
1065 /* write a dummy byte to the pipe so fh becomes ready */ 937 s_epipe_signal (&respipe);
1066 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
1067 938
1068 self->req = 0; 939 self->req = 0;
1069 worker_clear (self); 940 worker_clear (self);
1070 941
1071 X_UNLOCK (reslock); 942 X_UNLOCK (reslock);
1434 const_iv (REP_UNAVAIL) 1305 const_iv (REP_UNAVAIL)
1435 const_iv (RUNRECOVERY) 1306 const_iv (RUNRECOVERY)
1436 const_iv (SECONDARY_BAD) 1307 const_iv (SECONDARY_BAD)
1437 const_iv (VERIFY_BAD) 1308 const_iv (VERIFY_BAD)
1438 1309
1310 const_iv (SALVAGE)
1311 const_iv (AGGRESSIVE)
1312 const_iv (PRINTABLE)
1313 const_iv (NOORDERCHK)
1314 const_iv (ORDERCHKONLY)
1315
1439 const_iv (ARCH_ABS) 1316 const_iv (ARCH_ABS)
1440 const_iv (ARCH_DATA) 1317 const_iv (ARCH_DATA)
1441 const_iv (ARCH_LOG) 1318 const_iv (ARCH_LOG)
1442 const_iv (ARCH_REMOVE) 1319 const_iv (ARCH_REMOVE)
1443 1320
1600 1477
1601int 1478int
1602poll_fileno () 1479poll_fileno ()
1603 PROTOTYPE: 1480 PROTOTYPE:
1604 CODE: 1481 CODE:
1605 RETVAL = respipe [0]; 1482 RETVAL = s_epipe_fd (&respipe);
1606 OUTPUT: 1483 OUTPUT:
1607 RETVAL 1484 RETVAL
1608 1485
1609int 1486int
1610poll_cb (...) 1487poll_cb (...)
1888 req->uint1 = flags; 1765 req->uint1 = flags;
1889 REQ_SEND; 1766 REQ_SEND;
1890} 1767}
1891 1768
1892void 1769void
1770db_verify (DB *db, bdb_filename file, bdb_filename database = 0, SV *dummy = 0, U32 flags = 0, SV *callback = 0)
1771 PREINIT:
1772 CALLBACK
1773 CODE:
1774{
1775 dREQ (REQ_DB_VERIFY, 1);
1776 ptr_nuke (ST (0)); /* verify destroys the database handle, hopefully it is freed as well */
1777 req->db = db;
1778 req->buf1 = strdup (file);
1779 req->buf2 = strdup_ornull (database);
1780 req->uint1 = flags;
1781 REQ_SEND;
1782}
1783
1784void
1893db_upgrade (DB *db, bdb_filename file, U32 flags = 0, SV *callback = 0) 1785db_upgrade (DB *db, bdb_filename file, U32 flags = 0, SV *callback = 0)
1894 PREINIT: 1786 PREINIT:
1895 CALLBACK 1787 CALLBACK
1896 CODE: 1788 CODE:
1897{ 1789{
1898 dREQ (REQ_DB_SYNC, 1); 1790 dREQ (REQ_DB_UPGRADE, 1);
1899 req->db = db; 1791 req->db = db;
1900 req->buf1 = strdup (file); 1792 req->buf1 = strdup (file);
1901 req->uint1 = flags; 1793 req->uint1 = flags;
1902 REQ_SEND; 1794 REQ_SEND;
1903} 1795}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines