ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
(Generate patch)

Comparing BDB/BDB.xs (file contents):
Revision 1.27 by root, Mon Dec 17 06:31:43 2007 UTC vs.
Revision 1.28 by root, Sat Dec 22 07:33:48 2007 UTC

109 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, 109 REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH,
110 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, 110 REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL,
111 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, 111 REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE,
112}; 112};
113 113
114typedef struct aio_cb 114typedef struct bdb_cb
115{ 115{
116 struct aio_cb *volatile next; 116 struct bdb_cb *volatile next;
117 SV *callback; 117 SV *callback;
118 int type, pri, result; 118 int type, pri, result;
119 119
120 DB_ENV *env; 120 DB_ENV *env;
121 DB *db; 121 DB *db;
130 130
131 DBT dbt1, dbt2, dbt3; 131 DBT dbt1, dbt2, dbt3;
132 DB_KEY_RANGE key_range; 132 DB_KEY_RANGE key_range;
133 DB_SEQUENCE *seq; 133 DB_SEQUENCE *seq;
134 db_seq_t seq_t; 134 db_seq_t seq_t;
135} aio_cb; 135} bdb_cb;
136 136
137typedef aio_cb *aio_req; 137typedef bdb_cb *bdb_req;
138 138
139enum { 139enum {
140 PRI_MIN = -4, 140 PRI_MIN = -4,
141 PRI_MAX = 4, 141 PRI_MAX = 4,
142 142
169 struct worker *prev, *next; 169 struct worker *prev, *next;
170 170
171 thread_t tid; 171 thread_t tid;
172 172
173 /* locked by reslock, reqlock or wrklock */ 173 /* locked by reslock, reqlock or wrklock */
174 aio_req req; /* currently processed request */ 174 bdb_req req; /* currently processed request */
175 void *dbuf; 175 void *dbuf;
176 DIR *dirp; 176 DIR *dirp;
177} worker; 177} worker;
178 178
179static worker wrk_first = { &wrk_first, &wrk_first, 0 }; 179static worker wrk_first = { &wrk_first, &wrk_first, 0 };
246 * a somewhat faster data structure might be nice, but 246 * a somewhat faster data structure might be nice, but
247 * with 8 priorities this actually needs <20 insns 247 * with 8 priorities this actually needs <20 insns
248 * per shift, the most expensive operation. 248 * per shift, the most expensive operation.
249 */ 249 */
250typedef struct { 250typedef struct {
251 aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ 251 bdb_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
252 int size; 252 int size;
253} reqq; 253} reqq;
254 254
255static reqq req_queue; 255static reqq req_queue;
256static reqq res_queue; 256static reqq res_queue;
257 257
258int reqq_push (reqq *q, aio_req req) 258int reqq_push (reqq *q, bdb_req req)
259{ 259{
260 int pri = req->pri; 260 int pri = req->pri;
261 req->next = 0; 261 req->next = 0;
262 262
263 if (q->qe[pri]) 263 if (q->qe[pri])
269 q->qe[pri] = q->qs[pri] = req; 269 q->qe[pri] = q->qs[pri] = req;
270 270
271 return q->size++; 271 return q->size++;
272} 272}
273 273
274aio_req reqq_shift (reqq *q) 274bdb_req reqq_shift (reqq *q)
275{ 275{
276 int pri; 276 int pri;
277 277
278 if (!q->size) 278 if (!q->size)
279 return 0; 279 return 0;
280 280
281 --q->size; 281 --q->size;
282 282
283 for (pri = NUM_PRI; pri--; ) 283 for (pri = NUM_PRI; pri--; )
284 { 284 {
285 aio_req req = q->qs[pri]; 285 bdb_req req = q->qs[pri];
286 286
287 if (req) 287 if (req)
288 { 288 {
289 if (!(q->qs[pri] = req->next)) 289 if (!(q->qs[pri] = req->next))
290 q->qe[pri] = 0; 290 q->qe[pri] = 0;
295 295
296 abort (); 296 abort ();
297} 297}
298 298
299static int poll_cb (); 299static int poll_cb ();
300static void req_free (aio_req req); 300static void req_free (bdb_req req);
301static void req_cancel (aio_req req); 301static void req_cancel (bdb_req req);
302 302
303static int req_invoke (aio_req req) 303static int req_invoke (bdb_req req)
304{ 304{
305 dSP; 305 dSP;
306 306
307 if (SvOK (req->callback)) 307 if (SvOK (req->callback))
308 { 308 {
371 } 371 }
372 372
373 return !SvTRUE (ERRSV); 373 return !SvTRUE (ERRSV);
374} 374}
375 375
376static void req_free (aio_req req) 376static void req_free (bdb_req req)
377{ 377{
378 free (req->buf1); 378 free (req->buf1);
379 free (req->buf2); 379 free (req->buf2);
380 Safefree (req); 380 Safefree (req);
381} 381}
461 return; 461 return;
462 462
463 start_thread (); 463 start_thread ();
464} 464}
465 465
466static void req_send (aio_req req) 466static void req_send (bdb_req req)
467{ 467{
468 SV *wait_callback = 0; 468 SV *wait_callback = 0;
469 469
470 // synthesize callback if none given 470 // synthesize callback if none given
471 if (!SvOK (req->callback)) 471 if (!SvOK (req->callback))
505 } 505 }
506} 506}
507 507
508static void end_thread (void) 508static void end_thread (void)
509{ 509{
510 aio_req req; 510 bdb_req req;
511 511
512 Newz (0, req, 1, aio_cb); 512 Newz (0, req, 1, bdb_cb);
513 513
514 req->type = REQ_QUIT; 514 req->type = REQ_QUIT;
515 req->pri = PRI_MAX + PRI_BIAS; 515 req->pri = PRI_MAX + PRI_BIAS;
516 516
517 X_LOCK (reqlock); 517 X_LOCK (reqlock);
574 dSP; 574 dSP;
575 int count = 0; 575 int count = 0;
576 int maxreqs = max_poll_reqs; 576 int maxreqs = max_poll_reqs;
577 int do_croak = 0; 577 int do_croak = 0;
578 struct timeval tv_start, tv_now; 578 struct timeval tv_start, tv_now;
579 aio_req req; 579 bdb_req req;
580 580
581 if (max_poll_time) 581 if (max_poll_time)
582 gettimeofday (&tv_start, 0); 582 gettimeofday (&tv_start, 0);
583 583
584 for (;;) 584 for (;;)
645 645
646/*****************************************************************************/ 646/*****************************************************************************/
647 647
648X_THREAD_PROC (bdb_proc) 648X_THREAD_PROC (bdb_proc)
649{ 649{
650 aio_req req; 650 bdb_req req;
651 struct timespec ts; 651 struct timespec ts;
652 worker *self = (worker *)thr_arg; 652 worker *self = (worker *)thr_arg;
653 653
654 /* try to distribute timeouts somewhat evenly */ 654 /* try to distribute timeouts somewhat evenly */
655 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 655 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
873 X_UNLOCK (wrklock); 873 X_UNLOCK (wrklock);
874} 874}
875 875
876static void atfork_child (void) 876static void atfork_child (void)
877{ 877{
878 aio_req prv; 878 bdb_req prv;
879 879
880 while (prv = reqq_shift (&req_queue)) 880 while (prv = reqq_shift (&req_queue))
881 req_free (prv); 881 req_free (prv);
882 882
883 while (prv = reqq_shift (&res_queue)) 883 while (prv = reqq_shift (&res_queue))
904 904
905 atfork_parent (); 905 atfork_parent ();
906} 906}
907 907
908#define dREQ(reqtype) \ 908#define dREQ(reqtype) \
909 aio_req req; \ 909 bdb_req req; \
910 int req_pri = next_pri; \ 910 int req_pri = next_pri; \
911 next_pri = DEFAULT_PRI + PRI_BIAS; \ 911 next_pri = DEFAULT_PRI + PRI_BIAS; \
912 \ 912 \
913 if (SvOK (callback) && !SvROK (callback)) \ 913 if (SvOK (callback) && !SvROK (callback)) \
914 croak ("callback must be undef or of reference type"); \ 914 croak ("callback must be undef or of reference type"); \
915 \ 915 \
916 Newz (0, req, 1, aio_cb); \ 916 Newz (0, req, 1, bdb_cb); \
917 if (!req) \ 917 if (!req) \
918 croak ("out of memory during aio_req allocation"); \ 918 croak ("out of memory during bdb_req allocation"); \
919 \ 919 \
920 req->callback = newSVsv (callback); \ 920 req->callback = newSVsv (callback); \
921 req->type = (reqtype); \ 921 req->type = (reqtype); \
922 req->pri = req_pri 922 req->pri = req_pri
923 923
1492} 1492}
1493 1493
1494void 1494void
1495db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) 1495db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1496 CODE: 1496 CODE:
1497{
1498 if (SvREADONLY (data)) 1497 if (SvREADONLY (data))
1499 croak ("can't modify read-only data scalar in db_get"); 1498 croak ("can't modify read-only data scalar in db_get");
1500 1499{
1501 dREQ (REQ_DB_GET); 1500 dREQ (REQ_DB_GET);
1502 req->db = db; 1501 req->db = db;
1503 req->txn = txn; 1502 req->txn = txn;
1504 req->uint1 = flags; 1503 req->uint1 = flags;
1505 sv_to_dbt (&req->dbt1, key); 1504 sv_to_dbt (&req->dbt1, key);
1509} 1508}
1510 1509
1511void 1510void
1512db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) 1511db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef)
1513 CODE: 1512 CODE:
1514{
1515 if (SvREADONLY (data)) 1513 if (SvREADONLY (data))
1516 croak ("can't modify read-only data scalar in db_pget"); 1514 croak ("can't modify read-only data scalar in db_pget");
1517 1515{
1518 dREQ (REQ_DB_PGET); 1516 dREQ (REQ_DB_PGET);
1519 req->db = db; 1517 req->db = db;
1520 req->txn = txn; 1518 req->txn = txn;
1521 req->uint1 = flags; 1519 req->uint1 = flags;
1522 sv_to_dbt (&req->dbt1, key); 1520 sv_to_dbt (&req->dbt1, key);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines