ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.62 by root, Mon Oct 20 04:21:53 2008 UTC vs.
Revision 1.70 by root, Wed Jul 15 14:53:52 2009 UTC

6 6
7#include "EXTERN.h" 7#include "EXTERN.h"
8#include "perl.h" 8#include "perl.h"
9#include "XSUB.h" 9#include "XSUB.h"
10 10
11#include "schmorp.h"
12
11// perl stupidly defines these as macros, breaking 13// perl stupidly defines these as argument-less macros, breaking
12// lots and lots of code. 14// lots and lots of code.
13#undef open 15#undef open
14#undef close 16#undef close
15#undef abort 17#undef abort
16#undef malloc 18#undef malloc
149 151
150enum { 152enum {
151 REQ_QUIT, 153 REQ_QUIT,
152 REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT, 154 REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT,
153 REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, REQ_ENV_DBREMOVE, REQ_ENV_DBRENAME, 155 REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, REQ_ENV_DBREMOVE, REQ_ENV_DBRENAME,
156 REQ_ENV_LOG_ARCHIVE,
154 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_UPGRADE, 157 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_VERIFY, REQ_DB_UPGRADE,
155 REQ_DB_PUT, REQ_DB_EXISTS, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, 158 REQ_DB_PUT, REQ_DB_EXISTS, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE,
156 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, 159 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH,
157 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, 160 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL,
158 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, 161 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE,
159}; 162};
244} 247}
245 248
246static volatile unsigned int nreqs, nready, npending; 249static volatile unsigned int nreqs, nready, npending;
247static volatile unsigned int max_idle = 4; 250static volatile unsigned int max_idle = 4;
248static volatile unsigned int max_outstanding = 0xffffffff; 251static volatile unsigned int max_outstanding = 0xffffffff;
249static int respipe_osf [2], respipe [2] = { -1, -1 }; 252static s_epipe respipe;
250 253
251static mutex_t reslock = X_MUTEX_INIT; 254static mutex_t reslock = X_MUTEX_INIT;
252static mutex_t reqlock = X_MUTEX_INIT; 255static mutex_t reqlock = X_MUTEX_INIT;
253static cond_t reqwait = X_COND_INIT; 256static cond_t reqwait = X_COND_INIT;
254 257
377 380
378 av_push (av, newSVnv (req->key_range.less)); 381 av_push (av, newSVnv (req->key_range.less));
379 av_push (av, newSVnv (req->key_range.equal)); 382 av_push (av, newSVnv (req->key_range.equal));
380 av_push (av, newSVnv (req->key_range.greater)); 383 av_push (av, newSVnv (req->key_range.greater));
381 384
385 av = (AV *)newRV_noinc ((SV *)av);
386
382 SvREADONLY_off (req->sv1); 387 SvREADONLY_off (req->sv1);
383 sv_setsv_mg (req->sv1, newRV_noinc ((SV *)av)); 388 sv_setsv_mg (req->sv1, newRV_noinc ((SV *)av));
389 SvREFCNT_dec (av);
384 SvREFCNT_dec (req->sv1); 390 SvREFCNT_dec (req->sv1);
385 } 391 }
386 break; 392 break;
387 393
388#if DB_VERSION_MINOR >= 3 394#if DB_VERSION_MINOR >= 3
395 sv_setnv_mg (req->sv1, (NV)req->seq_t); 401 sv_setnv_mg (req->sv1, (NV)req->seq_t);
396 402
397 SvREFCNT_dec (req->sv1); 403 SvREFCNT_dec (req->sv1);
398 break; 404 break;
399#endif 405#endif
406
407 case REQ_ENV_LOG_ARCHIVE:
408 {
409 AV *av = newAV ();
410 char **listp = (char **)req->buf1;
411
412 if (listp)
413 while (*listp)
414 av_push (av, newSVpv (*listp, 0)), ++listp;
415
416 av = (AV *)newRV_noinc ((SV *)av);
417
418 SvREADONLY_off (req->sv1);
419 sv_setsv_mg (req->sv1, (SV *)av);
420 SvREFCNT_dec (av);
421 SvREFCNT_dec (req->sv1);
422 }
423 break;
400 } 424 }
401 425
402 errno = req->result; 426 errno = req->result;
403 427
404 if (req->callback) 428 if (req->callback)
434 free (req->buf3); 458 free (req->buf3);
435 459
436 Safefree (req); 460 Safefree (req);
437} 461}
438 462
439#ifdef USE_SOCKETS_AS_HANDLES
440# define TO_SOCKET(x) (win32_get_osfhandle (x))
441#else
442# define TO_SOCKET(x) (x)
443#endif
444
445static void 463static void
446create_respipe (void) 464create_respipe (void)
447{ 465{
448#ifdef _WIN32
449 int arg; /* argg */
450#endif
451 int old_readfd = respipe [0];
452
453 if (respipe [1] >= 0)
454 respipe_close (TO_SOCKET (respipe [1]));
455
456#ifdef _WIN32
457 if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe))
458#else
459 if (pipe (respipe)) 466 if (s_epipe_renew (&respipe))
460#endif 467 croak ("BDB: unable to create event pipe");
461 croak ("unable to initialize result pipe");
462
463 if (old_readfd >= 0)
464 {
465 if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0)
466 croak ("unable to initialize result pipe(2)");
467
468 respipe_close (respipe [0]);
469 respipe [0] = old_readfd;
470 }
471
472#ifdef _WIN32
473 arg = 1;
474 if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg)
475 || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg))
476#else
477 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
478 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
479#endif
480 croak ("unable to initialize result pipe(3)");
481
482 respipe_osf [0] = TO_SOCKET (respipe [0]);
483 respipe_osf [1] = TO_SOCKET (respipe [1]);
484} 468}
485 469
486static void bdb_request (bdb_req req); 470static void bdb_request (bdb_req req);
487X_THREAD_PROC (bdb_proc); 471X_THREAD_PROC (bdb_proc);
488 472
624 end_thread (); 608 end_thread ();
625} 609}
626 610
627static void poll_wait (void) 611static void poll_wait (void)
628{ 612{
629 fd_set rfd;
630
631 while (nreqs) 613 while (nreqs)
632 { 614 {
633 int size; 615 int size;
634 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 616 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
635 size = res_queue.size; 617 size = res_queue.size;
638 if (size) 620 if (size)
639 return; 621 return;
640 622
641 maybe_start_thread (); 623 maybe_start_thread ();
642 624
643 FD_ZERO (&rfd); 625 s_epipe_wait (&respipe);
644 FD_SET (respipe [0], &rfd);
645
646 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
647 } 626 }
648} 627}
649 628
650static int poll_cb (void) 629static int poll_cb (void)
651{ 630{
671 if (req) 650 if (req)
672 { 651 {
673 --npending; 652 --npending;
674 653
675 if (!res_queue.size) 654 if (!res_queue.size)
676 {
677 /* read any signals sent by the worker threads */ 655 /* read any signals sent by the worker threads */
678 char buf [4]; 656 s_epipe_drain (&respipe);
679 while (respipe_read (respipe [0], buf, 4) == 4)
680 ;
681 }
682 } 657 }
683 658
684 X_UNLOCK (reslock); 659 X_UNLOCK (reslock);
685 660
686 if (!req) 661 if (!req)
774 break; 749 break;
775#endif 750#endif
776 751
777 case REQ_DB_SYNC: 752 case REQ_DB_SYNC:
778 req->result = req->db->sync (req->db, req->uint1); 753 req->result = req->db->sync (req->db, req->uint1);
754 break;
755
756 case REQ_DB_VERIFY:
757 req->result = req->db->verify (req->db, req->buf1, req->buf2, 0, req->uint1);
779 break; 758 break;
780 759
781 case REQ_DB_UPGRADE: 760 case REQ_DB_UPGRADE:
782 req->result = req->db->upgrade (req->db, req->buf1, req->uint1); 761 req->result = req->db->upgrade (req->db, req->buf1, req->uint1);
783 break; 762 break;
870 case REQ_SEQ_REMOVE: 849 case REQ_SEQ_REMOVE:
871 req->result = req->seq->remove (req->seq, req->txn, req->uint1); 850 req->result = req->seq->remove (req->seq, req->txn, req->uint1);
872 break; 851 break;
873#endif 852#endif
874 853
854 case REQ_ENV_LOG_ARCHIVE:
855 {
856 char **listp = 0; /* DB_ARCH_REMOVE does not touch listp, contrary to docs */
857 req->result = req->env->log_archive (req->env, &listp, req->uint1);
858 req->buf1 = (char *)listp;
859 }
860 break;
861
875 default: 862 default:
876 req->result = ENOSYS; 863 req->result = ENOSYS;
877 break; 864 break;
878 } 865 }
879 866
890 /* try to distribute timeouts somewhat evenly */ 877 /* try to distribute timeouts somewhat evenly */
891 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 878 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
892 879
893 for (;;) 880 for (;;)
894 { 881 {
895 ts.tv_sec = time (0) + IDLE_TIMEOUT; 882 ts.tv_sec = time (0) + IDLE_TIMEOUT;
896 883
897 X_LOCK (reqlock); 884 X_LOCK (reqlock);
898 885
899 for (;;) 886 for (;;)
900 { 887 {
945 X_LOCK (reslock); 932 X_LOCK (reslock);
946 933
947 ++npending; 934 ++npending;
948 935
949 if (!reqq_push (&res_queue, req)) 936 if (!reqq_push (&res_queue, req))
950 /* write a dummy byte to the pipe so fh becomes ready */ 937 s_epipe_signal (&respipe);
951 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
952 938
953 self->req = 0; 939 self->req = 0;
954 worker_clear (self); 940 worker_clear (self);
955 941
956 X_UNLOCK (reslock); 942 X_UNLOCK (reslock);
1132 } 1118 }
1133 } 1119 }
1134 1120
1135 return 0; 1121 return 0;
1136} 1122}
1123
1124/*****************************************************************************/
1125
1126#if 0
1127static int
1128bt_pfxc_compare (DB *db, const DBT *dbt1, const DBT *dbt2)
1129{
1130 ssize_t size1 = dbt1->size;
1131 ssize_t size2 = dbt2->size;
1132 int res = memcmp ((void *)dbt1->data, (void *)dbt2->data,
1133 size1 <= size2 ? size1 : size2);
1134
1135 if (res)
1136 return res;
1137 else if (size1 - size2)
1138 return size1 - size2;
1139 else
1140 return 0;
1141}
1142
1143static size_t
1144bt_pfxc_prefix_x (DB *db, const DBT *dbt1, const DBT *dbt2)
1145{
1146 ssize_t size1 = dbt1->size;
1147 ssize_t size2 = dbt2->size;
1148 u_int8_t *p1 = (u_int8_t *)dbt1->data;
1149 u_int8_t *p2 = (u_int8_t *)dbt2->data;
1150 u_int8_t *pe = p1 + (size1 <= size2 ? size1 : size2);
1151
1152 while (p1 < pe)
1153 if (*p1++ != *p2++)
1154 return p1 - (u_int8_t *)dbt1->data - 1;
1155
1156 if (size1 < size2) return size1 + 1;
1157 if (size1 > size2) return size2 + 1;
1158
1159 return size1;
1160}
1161#endif
1162
1163/*****************************************************************************/
1137 1164
1138/* stupid windows defines CALLBACK as well */ 1165/* stupid windows defines CALLBACK as well */
1139#undef CALLBACK 1166#undef CALLBACK
1140#define CALLBACK SV *cb = pop_callback (&items, ST (items - 1)); 1167#define CALLBACK SV *cb = pop_callback (&items, ST (items - 1));
1141 1168
1278 const_iv (REP_UNAVAIL) 1305 const_iv (REP_UNAVAIL)
1279 const_iv (RUNRECOVERY) 1306 const_iv (RUNRECOVERY)
1280 const_iv (SECONDARY_BAD) 1307 const_iv (SECONDARY_BAD)
1281 const_iv (VERIFY_BAD) 1308 const_iv (VERIFY_BAD)
1282 1309
1310 const_iv (SALVAGE)
1311 const_iv (AGGRESSIVE)
1312 const_iv (PRINTABLE)
1313 const_iv (NOORDERCHK)
1314 const_iv (ORDERCHKONLY)
1315
1316 const_iv (ARCH_ABS)
1317 const_iv (ARCH_DATA)
1318 const_iv (ARCH_LOG)
1319 const_iv (ARCH_REMOVE)
1320
1283 const_iv (VERB_DEADLOCK) 1321 const_iv (VERB_DEADLOCK)
1284 const_iv (VERB_RECOVERY) 1322 const_iv (VERB_RECOVERY)
1285 const_iv (VERB_REPLICATION) 1323 const_iv (VERB_REPLICATION)
1286 const_iv (VERB_WAITSFOR) 1324 const_iv (VERB_WAITSFOR)
1287 1325
1320 const_iv (PRIORITY_VERY_LOW) 1358 const_iv (PRIORITY_VERY_LOW)
1321 const_iv (PRIORITY_LOW) 1359 const_iv (PRIORITY_LOW)
1322 const_iv (PRIORITY_DEFAULT) 1360 const_iv (PRIORITY_DEFAULT)
1323 const_iv (PRIORITY_HIGH) 1361 const_iv (PRIORITY_HIGH)
1324 const_iv (PRIORITY_VERY_HIGH) 1362 const_iv (PRIORITY_VERY_HIGH)
1363 const_iv (IGNORE_LEASE)
1325#endif 1364#endif
1326#if DB_VERSION_MINOR >= 7 1365#if DB_VERSION_MINOR >= 7
1327 const_iv (IGNORE_LEASE)
1328 //const_iv (MULTIPLE_KEY) 1366 //const_iv (MULTIPLE_KEY)
1329 const_iv (LOG_DIRECT) 1367 const_iv (LOG_DIRECT)
1330 const_iv (LOG_DSYNC) 1368 const_iv (LOG_DSYNC)
1331 const_iv (LOG_AUTO_REMOVE) 1369 const_iv (LOG_AUTO_REMOVE)
1332 const_iv (LOG_IN_MEMORY) 1370 const_iv (LOG_IN_MEMORY)
1439 1477
1440int 1478int
1441poll_fileno () 1479poll_fileno ()
1442 PROTOTYPE: 1480 PROTOTYPE:
1443 CODE: 1481 CODE:
1444 RETVAL = respipe [0]; 1482 RETVAL = s_epipe_fd (&respipe);
1445 OUTPUT: 1483 OUTPUT:
1446 RETVAL 1484 RETVAL
1447 1485
1448int 1486int
1449poll_cb (...) 1487poll_cb (...)
1633 dREQ (REQ_ENV_DBRENAME, 2); 1671 dREQ (REQ_ENV_DBRENAME, 2);
1634 req->env = env; 1672 req->env = env;
1635 req->buf1 = strdup_ornull (file); 1673 req->buf1 = strdup_ornull (file);
1636 req->buf2 = strdup_ornull (database); 1674 req->buf2 = strdup_ornull (database);
1637 req->buf3 = strdup_ornull (newname); 1675 req->buf3 = strdup_ornull (newname);
1676 req->uint1 = flags;
1677 REQ_SEND;
1678}
1679
1680void
1681db_env_log_archive (DB_ENV *env, SV_mutable *listp, U32 flags = 0, SV *callback = 0)
1682 PREINIT:
1683 CALLBACK
1684 CODE:
1685{
1686 dREQ (REQ_ENV_LOG_ARCHIVE, 1);
1687 req->sv1 = SvREFCNT_inc (listp);
1688 req->env = env;
1638 req->uint1 = flags; 1689 req->uint1 = flags;
1639 REQ_SEND; 1690 REQ_SEND;
1640} 1691}
1641 1692
1642DB * 1693DB *
1714 req->uint1 = flags; 1765 req->uint1 = flags;
1715 REQ_SEND; 1766 REQ_SEND;
1716} 1767}
1717 1768
1718void 1769void
1770db_verify (DB *db, bdb_filename file, bdb_filename database = 0, SV *dummy = 0, U32 flags = 0, SV *callback = 0)
1771 PREINIT:
1772 CALLBACK
1773 CODE:
1774{
1775 dREQ (REQ_DB_VERIFY, 1);
1776 ptr_nuke (ST (0)); /* verify destroys the database handle, hopefully it is freed as well */
1777 req->db = db;
1778 req->buf1 = strdup (file);
1779 req->buf2 = strdup_ornull (database);
1780 req->uint1 = flags;
1781 REQ_SEND;
1782}
1783
1784void
1719db_upgrade (DB *db, bdb_filename file, U32 flags = 0, SV *callback = 0) 1785db_upgrade (DB *db, bdb_filename file, U32 flags = 0, SV *callback = 0)
1720 PREINIT: 1786 PREINIT:
1721 CALLBACK 1787 CALLBACK
1722 CODE: 1788 CODE:
1723{ 1789{
1724 dREQ (REQ_DB_SYNC, 1); 1790 dREQ (REQ_DB_UPGRADE, 1);
1725 req->db = db; 1791 req->db = db;
1726 req->buf1 = strdup (file); 1792 req->buf1 = strdup (file);
1727 req->uint1 = flags; 1793 req->uint1 = flags;
1728 REQ_SEND; 1794 REQ_SEND;
1729} 1795}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines