ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.68 by root, Tue Jul 14 19:29:23 2009 UTC vs.
Revision 1.72 by root, Fri Dec 4 15:04:06 2009 UTC

8#include "perl.h" 8#include "perl.h"
9#include "XSUB.h" 9#include "XSUB.h"
10 10
11#include "schmorp.h" 11#include "schmorp.h"
12 12
13// perl stupidly defines these as macros, breaking 13// perl stupidly defines these as argument-less macros, breaking
14// lots and lots of code. 14// lots and lots of code.
15#undef open 15#undef open
16#undef close 16#undef close
17#undef abort 17#undef abort
18#undef malloc 18#undef malloc
58#endif 58#endif
59 59
60typedef char *bdb_filename; 60typedef char *bdb_filename;
61 61
62static SV *prepare_cb; 62static SV *prepare_cb;
63
64static HV
65 *bdb_stash,
66 *bdb_env_stash,
67 *bdb_txn_stash,
68 *bdb_cursor_stash,
69 *bdb_db_stash,
70 *bdb_sequence_stash;
63 71
64#if DB_VERSION_MINOR >= 6 72#if DB_VERSION_MINOR >= 6
65# define c_close close 73# define c_close close
66# define c_count count 74# define c_count count
67# define c_del del 75# define c_del del
247} 255}
248 256
249static volatile unsigned int nreqs, nready, npending; 257static volatile unsigned int nreqs, nready, npending;
250static volatile unsigned int max_idle = 4; 258static volatile unsigned int max_idle = 4;
251static volatile unsigned int max_outstanding = 0xffffffff; 259static volatile unsigned int max_outstanding = 0xffffffff;
252static int respipe_osf [2], respipe [2] = { -1, -1 }; 260static s_epipe respipe;
253 261
254static mutex_t reslock = X_MUTEX_INIT; 262static mutex_t reslock = X_MUTEX_INIT;
255static mutex_t reqlock = X_MUTEX_INIT; 263static mutex_t reqlock = X_MUTEX_INIT;
256static cond_t reqwait = X_COND_INIT; 264static cond_t reqwait = X_COND_INIT;
257 265
461} 469}
462 470
463static void 471static void
464create_respipe (void) 472create_respipe (void)
465{ 473{
466#ifdef _WIN32
467 int arg; /* argg */
468#endif
469 int old_readfd = respipe [0];
470
471 if (respipe [1] >= 0)
472 respipe_close (S_TO_SOCKET (respipe [1]));
473
474 if (s_pipe (respipe)) 474 if (s_epipe_renew (&respipe))
475 croak ("unable to initialize result pipe"); 475 croak ("BDB: unable to create event pipe");
476
477 if (old_readfd >= 0)
478 {
479 if (dup2 (S_TO_SOCKET (respipe [0]), S_TO_SOCKET (old_readfd)) < 0)
480 croak ("unable to initialize result pipe(2)");
481
482 respipe_close (respipe [0]);
483 respipe [0] = old_readfd;
484 }
485
486#ifdef _WIN32
487 arg = 1;
488 if (ioctlsocket (S_TO_SOCKET (respipe [0]), FIONBIO, &arg)
489 || ioctlsocket (S_TO_SOCKET (respipe [1]), FIONBIO, &arg))
490#else
491 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)
492 || fcntl (respipe [1], F_SETFL, O_NONBLOCK))
493#endif
494 croak ("unable to initialize result pipe(3)");
495
496 respipe_osf [0] = S_TO_SOCKET (respipe [0]);
497 respipe_osf [1] = S_TO_SOCKET (respipe [1]);
498} 476}
499 477
500static void bdb_request (bdb_req req); 478static void bdb_request (bdb_req req);
501X_THREAD_PROC (bdb_proc); 479X_THREAD_PROC (bdb_proc);
502 480
638 end_thread (); 616 end_thread ();
639} 617}
640 618
641static void poll_wait (void) 619static void poll_wait (void)
642{ 620{
643 fd_set rfd;
644
645 while (nreqs) 621 while (nreqs)
646 { 622 {
647 int size; 623 int size;
648 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 624 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
649 size = res_queue.size; 625 size = res_queue.size;
652 if (size) 628 if (size)
653 return; 629 return;
654 630
655 maybe_start_thread (); 631 maybe_start_thread ();
656 632
657 FD_ZERO (&rfd); 633 s_epipe_wait (&respipe);
658 FD_SET (respipe [0], &rfd);
659
660 PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0);
661 } 634 }
662} 635}
663 636
664static int poll_cb (void) 637static int poll_cb (void)
665{ 638{
685 if (req) 658 if (req)
686 { 659 {
687 --npending; 660 --npending;
688 661
689 if (!res_queue.size) 662 if (!res_queue.size)
690 {
691 /* read any signals sent by the worker threads */ 663 /* read any signals sent by the worker threads */
692 char buf [4]; 664 s_epipe_drain (&respipe);
693 while (respipe_read (respipe [0], buf, 4) == 4)
694 ;
695 }
696 } 665 }
697 666
698 X_UNLOCK (reslock); 667 X_UNLOCK (reslock);
699 668
700 if (!req) 669 if (!req)
916 /* try to distribute timeouts somewhat evenly */ 885 /* try to distribute timeouts somewhat evenly */
917 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 886 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
918 887
919 for (;;) 888 for (;;)
920 { 889 {
921 ts.tv_sec = time (0) + IDLE_TIMEOUT; 890 ts.tv_sec = time (0) + IDLE_TIMEOUT;
922 891
923 X_LOCK (reqlock); 892 X_LOCK (reqlock);
924 893
925 for (;;) 894 for (;;)
926 { 895 {
971 X_LOCK (reslock); 940 X_LOCK (reslock);
972 941
973 ++npending; 942 ++npending;
974 943
975 if (!reqq_push (&res_queue, req)) 944 if (!reqq_push (&res_queue, req))
976 /* write a dummy byte to the pipe so fh becomes ready */ 945 s_epipe_signal (&respipe);
977 respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1);
978 946
979 self->req = 0; 947 self->req = 0;
980 worker_clear (self); 948 worker_clear (self);
981 949
982 X_UNLOCK (reslock); 950 X_UNLOCK (reslock);
1058 (void)0; 1026 (void)0;
1059 1027
1060#define REQ_SEND \ 1028#define REQ_SEND \
1061 req_send (req) 1029 req_send (req)
1062 1030
1063#define SvPTR(var, arg, type, class, nullok) \ 1031#define SvPTR(var, arg, type, stash, class, nullok) \
1064 if (!SvOK (arg)) \ 1032 if (!SvOK (arg)) \
1065 { \ 1033 { \
1066 if (nullok != 1) \ 1034 if (nullok != 1) \
1067 croak (# var " must be a " # class " object, not undef"); \ 1035 croak (# var " must be a " # class " object, not undef"); \
1068 \ 1036 \
1069 (var) = 0; \ 1037 (var) = 0; \
1070 } \ 1038 } \
1071 else if (sv_derived_from ((arg), # class)) \ 1039 else if (SvSTASH (SvRV (arg)) == stash || sv_derived_from ((arg), # class)) \
1072 { \ 1040 { \
1073 IV tmp = SvIV ((SV*) SvRV (arg)); \ 1041 IV tmp = SvIV ((SV*) SvRV (arg)); \
1074 (var) = INT2PTR (type, tmp); \ 1042 (var) = INT2PTR (type, tmp); \
1075 if (!var && nullok != 2) \ 1043 if (!var && nullok != 2) \
1076 croak (# var " is not a valid " # class " object anymore"); \ 1044 croak (# var " is not a valid " # class " object anymore"); \
1079 croak (# var " is not of type " # class); 1047 croak (# var " is not of type " # class);
1080 1048
1081#define ARG_MUTABLE(name) \ 1049#define ARG_MUTABLE(name) \
1082 if (SvREADONLY (name)) \ 1050 if (SvREADONLY (name)) \
1083 croak ("argument " #name " is read-only/constant, but the request requires it to be mutable"); 1051 croak ("argument " #name " is read-only/constant, but the request requires it to be mutable");
1052
1053static SV *
1054newSVptr (void *ptr, HV *stash)
1055{
1056 SV *rv = NEWSV (0, 0);
1057 sv_upgrade (rv, SVt_PVMG);
1058 sv_setiv (rv, PTR2IV (ptr));
1059
1060 return sv_bless (newRV_noinc (rv), stash);
1061}
1084 1062
1085static void 1063static void
1086ptr_nuke (SV *sv) 1064ptr_nuke (SV *sv)
1087{ 1065{
1088 assert (SvROK (sv)); 1066 assert (SvROK (sv));
1210 1188
1211PROTOTYPES: ENABLE 1189PROTOTYPES: ENABLE
1212 1190
1213BOOT: 1191BOOT:
1214{ 1192{
1215 HV *stash = gv_stashpv ("BDB", 1);
1216
1217 static const struct { 1193 static const struct {
1218 const char *name; 1194 const char *name;
1219 IV iv; 1195 IV iv;
1220 } *civ, const_iv[] = { 1196 } *civ, const_iv[] = {
1221#define const_iv(name) { # name, (IV)DB_ ## name }, 1197#define const_iv(name) { # name, (IV)DB_ ## name },
1417 const_iv (LOG_INMEMORY) 1393 const_iv (LOG_INMEMORY)
1418# endif 1394# endif
1419#endif 1395#endif
1420 }; 1396 };
1421 1397
1398 bdb_stash = gv_stashpv ("BDB" , 1);
1399 bdb_env_stash = gv_stashpv ("BDB::Env" , 1);
1400 bdb_txn_stash = gv_stashpv ("BDB::Txn" , 1);
1401 bdb_cursor_stash = gv_stashpv ("BDB::Cursor" , 1);
1402 bdb_db_stash = gv_stashpv ("BDB::Db" , 1);
1403 bdb_sequence_stash = gv_stashpv ("BDB::Sequence", 1);
1404
1422 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) 1405 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
1423 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); 1406 newCONSTSUB (bdb_stash, (char *)civ->name, newSViv (civ->iv));
1424 1407
1425 prepare_cb = &PL_sv_undef; 1408 prepare_cb = &PL_sv_undef;
1426 1409
1427 { 1410 {
1428 /* we currently only allow version, minor-version and patchlevel to go up to 255 */ 1411 /* we currently only allow version, minor-version and patchlevel to go up to 255 */
1429 char vstring[3] = { DB_VERSION_MAJOR, DB_VERSION_MINOR, DB_VERSION_PATCH }; 1412 char vstring[3] = { DB_VERSION_MAJOR, DB_VERSION_MINOR, DB_VERSION_PATCH };
1430 1413
1431 newCONSTSUB (stash, "VERSION_v", newSVpvn (vstring, 3)); 1414 newCONSTSUB (bdb_stash, "VERSION_v", newSVpvn (vstring, 3));
1432 } 1415 }
1433 1416
1434 newCONSTSUB (stash, "VERSION_STRING", newSVpv (DB_VERSION_STRING, 0)); 1417 newCONSTSUB (bdb_stash, "VERSION_STRING", newSVpv (DB_VERSION_STRING, 0));
1435 1418
1436 create_respipe (); 1419 create_respipe ();
1437 1420
1438 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); 1421 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1439 patch_errno (); 1422 patch_errno ();
1517 1500
1518int 1501int
1519poll_fileno () 1502poll_fileno ()
1520 PROTOTYPE: 1503 PROTOTYPE:
1521 CODE: 1504 CODE:
1522 RETVAL = respipe [0]; 1505 RETVAL = s_epipe_fd (&respipe);
1523 OUTPUT: 1506 OUTPUT:
1524 RETVAL 1507 RETVAL
1525 1508
1526int 1509int
1527poll_cb (...) 1510poll_cb (...)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines