ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.25 by root, Mon Dec 10 04:45:54 2007 UTC vs.
Revision 1.28 by root, Sat Dec 22 07:33:48 2007 UTC

102 102
103enum { 103enum {
104 REQ_QUIT, 104 REQ_QUIT,
105 REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT, 105 REQ_ENV_OPEN, REQ_ENV_CLOSE, REQ_ENV_TXN_CHECKPOINT, REQ_ENV_LOCK_DETECT,
106 REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE, 106 REQ_ENV_MEMP_SYNC, REQ_ENV_MEMP_TRICKLE,
107 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, 107 REQ_DB_OPEN, REQ_DB_CLOSE, REQ_DB_COMPACT, REQ_DB_SYNC, REQ_DB_UPGRADE,
108 REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE, 108 REQ_DB_PUT, REQ_DB_GET, REQ_DB_PGET, REQ_DB_DEL, REQ_DB_KEY_RANGE,
109 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, 109 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH,
110 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, 110 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL,
111 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, 111 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE,
112}; 112};
113 113
114typedef struct aio_cb 114typedef struct bdb_cb
115{ 115{
116 struct aio_cb *volatile next; 116 struct bdb_cb *volatile next;
117 SV *callback; 117 SV *callback;
118 int type, pri, result; 118 int type, pri, result;
119 119
120 DB_ENV *env; 120 DB_ENV *env;
121 DB *db; 121 DB *db;
130 130
131 DBT dbt1, dbt2, dbt3; 131 DBT dbt1, dbt2, dbt3;
132 DB_KEY_RANGE key_range; 132 DB_KEY_RANGE key_range;
133 DB_SEQUENCE *seq; 133 DB_SEQUENCE *seq;
134 db_seq_t seq_t; 134 db_seq_t seq_t;
135} aio_cb; 135} bdb_cb;
136 136
137typedef aio_cb *aio_req; 137typedef bdb_cb *bdb_req;
138 138
139enum { 139enum {
140 PRI_MIN = -4, 140 PRI_MIN = -4,
141 PRI_MAX = 4, 141 PRI_MAX = 4,
142 142
169 struct worker *prev, *next; 169 struct worker *prev, *next;
170 170
171 thread_t tid; 171 thread_t tid;
172 172
173 /* locked by reslock, reqlock or wrklock */ 173 /* locked by reslock, reqlock or wrklock */
174 aio_req req; /* currently processed request */ 174 bdb_req req; /* currently processed request */
175 void *dbuf; 175 void *dbuf;
176 DIR *dirp; 176 DIR *dirp;
177} worker; 177} worker;
178 178
179static worker wrk_first = { &wrk_first, &wrk_first, 0 }; 179static worker wrk_first = { &wrk_first, &wrk_first, 0 };
246 * a somewhat faster data structure might be nice, but 246 * a somewhat faster data structure might be nice, but
247 * with 8 priorities this actually needs <20 insns 247 * with 8 priorities this actually needs <20 insns
248 * per shift, the most expensive operation. 248 * per shift, the most expensive operation.
249 */ 249 */
250typedef struct { 250typedef struct {
251 aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ 251 bdb_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
252 int size; 252 int size;
253} reqq; 253} reqq;
254 254
255static reqq req_queue; 255static reqq req_queue;
256static reqq res_queue; 256static reqq res_queue;
257 257
258int reqq_push (reqq *q, aio_req req) 258int reqq_push (reqq *q, bdb_req req)
259{ 259{
260 int pri = req->pri; 260 int pri = req->pri;
261 req->next = 0; 261 req->next = 0;
262 262
263 if (q->qe[pri]) 263 if (q->qe[pri])
269 q->qe[pri] = q->qs[pri] = req; 269 q->qe[pri] = q->qs[pri] = req;
270 270
271 return q->size++; 271 return q->size++;
272} 272}
273 273
274aio_req reqq_shift (reqq *q) 274bdb_req reqq_shift (reqq *q)
275{ 275{
276 int pri; 276 int pri;
277 277
278 if (!q->size) 278 if (!q->size)
279 return 0; 279 return 0;
280 280
281 --q->size; 281 --q->size;
282 282
283 for (pri = NUM_PRI; pri--; ) 283 for (pri = NUM_PRI; pri--; )
284 { 284 {
285 aio_req req = q->qs[pri]; 285 bdb_req req = q->qs[pri];
286 286
287 if (req) 287 if (req)
288 { 288 {
289 if (!(q->qs[pri] = req->next)) 289 if (!(q->qs[pri] = req->next))
290 q->qe[pri] = 0; 290 q->qe[pri] = 0;
295 295
296 abort (); 296 abort ();
297} 297}
298 298
299static int poll_cb (); 299static int poll_cb ();
300static void req_free (aio_req req); 300static void req_free (bdb_req req);
301static void req_cancel (aio_req req); 301static void req_cancel (bdb_req req);
302 302
303static int req_invoke (aio_req req) 303static int req_invoke (bdb_req req)
304{ 304{
305 dSP; 305 dSP;
306 306
307 if (SvOK (req->callback)) 307 if (SvOK (req->callback))
308 { 308 {
371 } 371 }
372 372
373 return !SvTRUE (ERRSV); 373 return !SvTRUE (ERRSV);
374} 374}
375 375
376static void req_free (aio_req req) 376static void req_free (bdb_req req)
377{ 377{
378 free (req->buf1); 378 free (req->buf1);
379 free (req->buf2); 379 free (req->buf2);
380 Safefree (req); 380 Safefree (req);
381} 381}
461 return; 461 return;
462 462
463 start_thread (); 463 start_thread ();
464} 464}
465 465
466static void req_send (aio_req req) 466static void req_send (bdb_req req)
467{ 467{
468 SV *wait_callback = 0; 468 SV *wait_callback = 0;
469 469
470 // synthesize callback if none given 470 // synthesize callback if none given
471 if (!SvOK (req->callback)) 471 if (!SvOK (req->callback))
479 SPAGAIN; 479 SPAGAIN;
480 480
481 if (count != 2) 481 if (count != 2)
482 croak ("prepare callback must return exactly two values\n"); 482 croak ("prepare callback must return exactly two values\n");
483 483
484 wait_callback = SvREFCNT_inc (POPs); 484 wait_callback = POPs;
485 SvREFCNT_dec (req->callback); 485 SvREFCNT_dec (req->callback);
486 req->callback = SvREFCNT_inc (POPs); 486 req->callback = SvREFCNT_inc (POPs);
487 } 487 }
488 488
489 ++nreqs; 489 ++nreqs;
500 { 500 {
501 dSP; 501 dSP;
502 PUSHMARK (SP); 502 PUSHMARK (SP);
503 PUTBACK; 503 PUTBACK;
504 call_sv (wait_callback, G_DISCARD); 504 call_sv (wait_callback, G_DISCARD);
505 SvREFCNT_dec (wait_callback);
506 } 505 }
507} 506}
508 507
509static void end_thread (void) 508static void end_thread (void)
510{ 509{
511 aio_req req; 510 bdb_req req;
512 511
513 Newz (0, req, 1, aio_cb); 512 Newz (0, req, 1, bdb_cb);
514 513
515 req->type = REQ_QUIT; 514 req->type = REQ_QUIT;
516 req->pri = PRI_MAX + PRI_BIAS; 515 req->pri = PRI_MAX + PRI_BIAS;
517 516
518 X_LOCK (reqlock); 517 X_LOCK (reqlock);
575 dSP; 574 dSP;
576 int count = 0; 575 int count = 0;
577 int maxreqs = max_poll_reqs; 576 int maxreqs = max_poll_reqs;
578 int do_croak = 0; 577 int do_croak = 0;
579 struct timeval tv_start, tv_now; 578 struct timeval tv_start, tv_now;
580 aio_req req; 579 bdb_req req;
581 580
582 if (max_poll_time) 581 if (max_poll_time)
583 gettimeofday (&tv_start, 0); 582 gettimeofday (&tv_start, 0);
584 583
585 for (;;) 584 for (;;)
646 645
647/*****************************************************************************/ 646/*****************************************************************************/
648 647
649X_THREAD_PROC (bdb_proc) 648X_THREAD_PROC (bdb_proc)
650{ 649{
651 aio_req req; 650 bdb_req req;
652 struct timespec ts; 651 struct timespec ts;
653 worker *self = (worker *)thr_arg; 652 worker *self = (worker *)thr_arg;
654 653
655 /* try to distribute timeouts somewhat evenly */ 654 /* try to distribute timeouts somewhat evenly */
656 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 655 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
739 738
740 case REQ_DB_SYNC: 739 case REQ_DB_SYNC:
741 req->result = req->db->sync (req->db, req->uint1); 740 req->result = req->db->sync (req->db, req->uint1);
742 break; 741 break;
743 742
743 case REQ_DB_UPGRADE:
744 req->result = req->db->upgrade (req->db, req->buf1, req->uint1);
745 break;
746
744 case REQ_DB_PUT: 747 case REQ_DB_PUT:
745 req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1); 748 req->result = req->db->put (req->db, req->txn, &req->dbt1, &req->dbt2, req->uint1);
746 break; 749 break;
747 750
748 case REQ_DB_GET: 751 case REQ_DB_GET:
870 X_UNLOCK (wrklock); 873 X_UNLOCK (wrklock);
871} 874}
872 875
873static void atfork_child (void) 876static void atfork_child (void)
874{ 877{
875 aio_req prv; 878 bdb_req prv;
876 879
877 while (prv = reqq_shift (&req_queue)) 880 while (prv = reqq_shift (&req_queue))
878 req_free (prv); 881 req_free (prv);
879 882
880 while (prv = reqq_shift (&res_queue)) 883 while (prv = reqq_shift (&res_queue))
901 904
902 atfork_parent (); 905 atfork_parent ();
903} 906}
904 907
905#define dREQ(reqtype) \ 908#define dREQ(reqtype) \
906 aio_req req; \ 909 bdb_req req; \
907 int req_pri = next_pri; \ 910 int req_pri = next_pri; \
908 next_pri = DEFAULT_PRI + PRI_BIAS; \ 911 next_pri = DEFAULT_PRI + PRI_BIAS; \
909 \ 912 \
910 if (SvOK (callback) && !SvROK (callback)) \ 913 if (SvOK (callback) && !SvROK (callback)) \
911 croak ("callback must be undef or of reference type"); \ 914 croak ("callback must be undef or of reference type"); \
912 \ 915 \
913 Newz (0, req, 1, aio_cb); \ 916 Newz (0, req, 1, bdb_cb); \
914 if (!req) \ 917 if (!req) \
915 croak ("out of memory during aio_req allocation"); \ 918 croak ("out of memory during bdb_req allocation"); \
916 \ 919 \
917 req->callback = newSVsv (callback); \ 920 req->callback = newSVsv (callback); \
918 req->type = (reqtype); \ 921 req->type = (reqtype); \
919 req->pri = req_pri 922 req->pri = req_pri
920 923
1024 const_iv (GET_BOTH_RANGE) 1027 const_iv (GET_BOTH_RANGE)
1025 //const_iv (SET_RECNO) 1028 //const_iv (SET_RECNO)
1026 //const_iv (MULTIPLE) 1029 //const_iv (MULTIPLE)
1027 const_iv (SNAPSHOT) 1030 const_iv (SNAPSHOT)
1028 const_iv (JOIN_ITEM) 1031 const_iv (JOIN_ITEM)
1032 const_iv (JOIN_NOSORT)
1029 const_iv (RMW) 1033 const_iv (RMW)
1030 1034
1031 const_iv (NOTFOUND) 1035 const_iv (NOTFOUND)
1032 const_iv (KEYEMPTY) 1036 const_iv (KEYEMPTY)
1033 const_iv (LOCK_DEADLOCK) 1037 const_iv (LOCK_DEADLOCK)
1049 const_iv (TXN_SYNC) 1053 const_iv (TXN_SYNC)
1050 1054
1051 const_iv (SET_LOCK_TIMEOUT) 1055 const_iv (SET_LOCK_TIMEOUT)
1052 const_iv (SET_TXN_TIMEOUT) 1056 const_iv (SET_TXN_TIMEOUT)
1053 1057
1054 const_iv (JOIN_ITEM)
1055 const_iv (FIRST) 1058 const_iv (FIRST)
1056 const_iv (NEXT) 1059 const_iv (NEXT)
1057 const_iv (NEXT_DUP) 1060 const_iv (NEXT_DUP)
1058 const_iv (NEXT_NODUP) 1061 const_iv (NEXT_NODUP)
1059 const_iv (PREV) 1062 const_iv (PREV)
1125 const_iv (MULTIVERSION) 1128 const_iv (MULTIVERSION)
1126 const_iv (TXN_SNAPSHOT) 1129 const_iv (TXN_SNAPSHOT)
1127#endif 1130#endif
1128#if DB_VERSION_MINOR >= 6 1131#if DB_VERSION_MINOR >= 6
1129 const_iv (PREV_DUP) 1132 const_iv (PREV_DUP)
1130# if 0
1131 const_iv (PRIORITY_UNCHANGED) 1133 const_iv (PRIORITY_UNCHANGED)
1132 const_iv (PRIORITY_VERY_LOW) 1134 const_iv (PRIORITY_VERY_LOW)
1133 const_iv (PRIORITY_LOW) 1135 const_iv (PRIORITY_LOW)
1134 const_iv (PRIORITY_DEFAULT) 1136 const_iv (PRIORITY_DEFAULT)
1135 const_iv (PRIORITY_HIGH) 1137 const_iv (PRIORITY_HIGH)
1136 const_iv (PRIORITY_VERY_HIGH) 1138 const_iv (PRIORITY_VERY_HIGH)
1137# endif
1138#endif 1139#endif
1139 }; 1140 };
1140 1141
1141 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; ) 1142 for (civ = const_iv + sizeof (const_iv) / sizeof (const_iv [0]); civ-- > const_iv; )
1142 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv)); 1143 newCONSTSUB (stash, (char *)civ->name, newSViv (civ->iv));
1452 req->uint1 = flags; 1453 req->uint1 = flags;
1453 REQ_SEND; 1454 REQ_SEND;
1454} 1455}
1455 1456
1456void 1457void
1458db_upgrade (DB *db, octetstring file, U32 flags = 0, SV *callback = &PL_sv_undef)
1459 CODE:
1460{
1461 dREQ (REQ_DB_SYNC);
1462 req->db = db;
1463 req->buf1 = strdup (file);
1464 req->uint1 = flags;
1465 REQ_SEND;
1466}
1467
1468void
1457db_key_range (DB *db, DB_TXN_ornull *txn, SV *key, SV *key_range, U32 flags = 0, SV *callback = &PL_sv_undef) 1469db_key_range (DB *db, DB_TXN_ornull *txn, SV *key, SV *key_range, U32 flags = 0, SV *callback = &PL_sv_undef)
1458 CODE: 1470 CODE:
1459{ 1471{
1460 dREQ (REQ_DB_KEY_RANGE); 1472 dREQ (REQ_DB_KEY_RANGE);
1461 req->db = db; 1473 req->db = db;
1480} 1492}
1481 1493
1482void 1494void
1483db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) 1495db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1484 CODE: 1496 CODE:
1497 if (SvREADONLY (data))
1498 croak ("can't modify read-only data scalar in db_get");
1485{ 1499{
1486 dREQ (REQ_DB_GET); 1500 dREQ (REQ_DB_GET);
1487 req->db = db; 1501 req->db = db;
1488 req->txn = txn; 1502 req->txn = txn;
1489 req->uint1 = flags; 1503 req->uint1 = flags;
1494} 1508}
1495 1509
1496void 1510void
1497db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) 1511db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1498 CODE: 1512 CODE:
1513 if (SvREADONLY (data))
1514 croak ("can't modify read-only data scalar in db_pget");
1499{ 1515{
1500 dREQ (REQ_DB_PGET); 1516 dREQ (REQ_DB_PGET);
1501 req->db = db; 1517 req->db = db;
1502 req->txn = txn; 1518 req->txn = txn;
1503 req->uint1 = flags; 1519 req->uint1 = flags;
1985DESTROY (DBC_ornull *dbc) 2001DESTROY (DBC_ornull *dbc)
1986 CODE: 2002 CODE:
1987 if (dbc) 2003 if (dbc)
1988 dbc->c_close (dbc); 2004 dbc->c_close (dbc);
1989 2005
2006#if DB_VERSION_MINOR >= 6
2007
2008int set_priority (DBC *dbc, int priority)
2009 CODE:
2010 dbc->set_priority (dbc, priority);
2011
2012#endif
2013
1990MODULE = BDB PACKAGE = BDB::Sequence 2014MODULE = BDB PACKAGE = BDB::Sequence
1991 2015
1992void 2016void
1993DESTROY (DB_SEQUENCE_ornull *seq) 2017DESTROY (DB_SEQUENCE_ornull *seq)
1994 CODE: 2018 CODE:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines