… | |
… | |
109 | REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, |
109 | REQ_TXN_COMMIT, REQ_TXN_ABORT, REQ_TXN_FINISH, |
110 | REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, |
110 | REQ_C_CLOSE, REQ_C_COUNT, REQ_C_PUT, REQ_C_GET, REQ_C_PGET, REQ_C_DEL, |
111 | REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, |
111 | REQ_SEQ_OPEN, REQ_SEQ_CLOSE, REQ_SEQ_GET, REQ_SEQ_REMOVE, |
112 | }; |
112 | }; |
113 | |
113 | |
114 | typedef struct aio_cb |
114 | typedef struct bdb_cb |
115 | { |
115 | { |
116 | struct aio_cb *volatile next; |
116 | struct bdb_cb *volatile next; |
117 | SV *callback; |
117 | SV *callback; |
118 | int type, pri, result; |
118 | int type, pri, result; |
119 | |
119 | |
120 | DB_ENV *env; |
120 | DB_ENV *env; |
121 | DB *db; |
121 | DB *db; |
… | |
… | |
130 | |
130 | |
131 | DBT dbt1, dbt2, dbt3; |
131 | DBT dbt1, dbt2, dbt3; |
132 | DB_KEY_RANGE key_range; |
132 | DB_KEY_RANGE key_range; |
133 | DB_SEQUENCE *seq; |
133 | DB_SEQUENCE *seq; |
134 | db_seq_t seq_t; |
134 | db_seq_t seq_t; |
135 | } aio_cb; |
135 | } bdb_cb; |
136 | |
136 | |
137 | typedef aio_cb *aio_req; |
137 | typedef bdb_cb *bdb_req; |
138 | |
138 | |
139 | enum { |
139 | enum { |
140 | PRI_MIN = -4, |
140 | PRI_MIN = -4, |
141 | PRI_MAX = 4, |
141 | PRI_MAX = 4, |
142 | |
142 | |
… | |
… | |
169 | struct worker *prev, *next; |
169 | struct worker *prev, *next; |
170 | |
170 | |
171 | thread_t tid; |
171 | thread_t tid; |
172 | |
172 | |
173 | /* locked by reslock, reqlock or wrklock */ |
173 | /* locked by reslock, reqlock or wrklock */ |
174 | aio_req req; /* currently processed request */ |
174 | bdb_req req; /* currently processed request */ |
175 | void *dbuf; |
175 | void *dbuf; |
176 | DIR *dirp; |
176 | DIR *dirp; |
177 | } worker; |
177 | } worker; |
178 | |
178 | |
179 | static worker wrk_first = { &wrk_first, &wrk_first, 0 }; |
179 | static worker wrk_first = { &wrk_first, &wrk_first, 0 }; |
… | |
… | |
246 | * a somewhat faster data structure might be nice, but |
246 | * a somewhat faster data structure might be nice, but |
247 | * with 8 priorities this actually needs <20 insns |
247 | * with 8 priorities this actually needs <20 insns |
248 | * per shift, the most expensive operation. |
248 | * per shift, the most expensive operation. |
249 | */ |
249 | */ |
250 | typedef struct { |
250 | typedef struct { |
251 | aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ |
251 | bdb_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */ |
252 | int size; |
252 | int size; |
253 | } reqq; |
253 | } reqq; |
254 | |
254 | |
255 | static reqq req_queue; |
255 | static reqq req_queue; |
256 | static reqq res_queue; |
256 | static reqq res_queue; |
257 | |
257 | |
258 | int reqq_push (reqq *q, aio_req req) |
258 | int reqq_push (reqq *q, bdb_req req) |
259 | { |
259 | { |
260 | int pri = req->pri; |
260 | int pri = req->pri; |
261 | req->next = 0; |
261 | req->next = 0; |
262 | |
262 | |
263 | if (q->qe[pri]) |
263 | if (q->qe[pri]) |
… | |
… | |
269 | q->qe[pri] = q->qs[pri] = req; |
269 | q->qe[pri] = q->qs[pri] = req; |
270 | |
270 | |
271 | return q->size++; |
271 | return q->size++; |
272 | } |
272 | } |
273 | |
273 | |
274 | aio_req reqq_shift (reqq *q) |
274 | bdb_req reqq_shift (reqq *q) |
275 | { |
275 | { |
276 | int pri; |
276 | int pri; |
277 | |
277 | |
278 | if (!q->size) |
278 | if (!q->size) |
279 | return 0; |
279 | return 0; |
280 | |
280 | |
281 | --q->size; |
281 | --q->size; |
282 | |
282 | |
283 | for (pri = NUM_PRI; pri--; ) |
283 | for (pri = NUM_PRI; pri--; ) |
284 | { |
284 | { |
285 | aio_req req = q->qs[pri]; |
285 | bdb_req req = q->qs[pri]; |
286 | |
286 | |
287 | if (req) |
287 | if (req) |
288 | { |
288 | { |
289 | if (!(q->qs[pri] = req->next)) |
289 | if (!(q->qs[pri] = req->next)) |
290 | q->qe[pri] = 0; |
290 | q->qe[pri] = 0; |
… | |
… | |
295 | |
295 | |
296 | abort (); |
296 | abort (); |
297 | } |
297 | } |
298 | |
298 | |
299 | static int poll_cb (); |
299 | static int poll_cb (); |
300 | static void req_free (aio_req req); |
300 | static void req_free (bdb_req req); |
301 | static void req_cancel (aio_req req); |
301 | static void req_cancel (bdb_req req); |
302 | |
302 | |
303 | static int req_invoke (aio_req req) |
303 | static int req_invoke (bdb_req req) |
304 | { |
304 | { |
305 | dSP; |
305 | dSP; |
306 | |
306 | |
307 | if (SvOK (req->callback)) |
307 | if (SvOK (req->callback)) |
308 | { |
308 | { |
… | |
… | |
371 | } |
371 | } |
372 | |
372 | |
373 | return !SvTRUE (ERRSV); |
373 | return !SvTRUE (ERRSV); |
374 | } |
374 | } |
375 | |
375 | |
376 | static void req_free (aio_req req) |
376 | static void req_free (bdb_req req) |
377 | { |
377 | { |
378 | free (req->buf1); |
378 | free (req->buf1); |
379 | free (req->buf2); |
379 | free (req->buf2); |
380 | Safefree (req); |
380 | Safefree (req); |
381 | } |
381 | } |
… | |
… | |
461 | return; |
461 | return; |
462 | |
462 | |
463 | start_thread (); |
463 | start_thread (); |
464 | } |
464 | } |
465 | |
465 | |
466 | static void req_send (aio_req req) |
466 | static void req_send (bdb_req req) |
467 | { |
467 | { |
468 | SV *wait_callback = 0; |
468 | SV *wait_callback = 0; |
469 | |
469 | |
470 | // synthesize callback if none given |
470 | // synthesize callback if none given |
471 | if (!SvOK (req->callback)) |
471 | if (!SvOK (req->callback)) |
… | |
… | |
505 | } |
505 | } |
506 | } |
506 | } |
507 | |
507 | |
508 | static void end_thread (void) |
508 | static void end_thread (void) |
509 | { |
509 | { |
510 | aio_req req; |
510 | bdb_req req; |
511 | |
511 | |
512 | Newz (0, req, 1, aio_cb); |
512 | Newz (0, req, 1, bdb_cb); |
513 | |
513 | |
514 | req->type = REQ_QUIT; |
514 | req->type = REQ_QUIT; |
515 | req->pri = PRI_MAX + PRI_BIAS; |
515 | req->pri = PRI_MAX + PRI_BIAS; |
516 | |
516 | |
517 | X_LOCK (reqlock); |
517 | X_LOCK (reqlock); |
… | |
… | |
574 | dSP; |
574 | dSP; |
575 | int count = 0; |
575 | int count = 0; |
576 | int maxreqs = max_poll_reqs; |
576 | int maxreqs = max_poll_reqs; |
577 | int do_croak = 0; |
577 | int do_croak = 0; |
578 | struct timeval tv_start, tv_now; |
578 | struct timeval tv_start, tv_now; |
579 | aio_req req; |
579 | bdb_req req; |
580 | |
580 | |
581 | if (max_poll_time) |
581 | if (max_poll_time) |
582 | gettimeofday (&tv_start, 0); |
582 | gettimeofday (&tv_start, 0); |
583 | |
583 | |
584 | for (;;) |
584 | for (;;) |
… | |
… | |
645 | |
645 | |
646 | /*****************************************************************************/ |
646 | /*****************************************************************************/ |
647 | |
647 | |
648 | X_THREAD_PROC (bdb_proc) |
648 | X_THREAD_PROC (bdb_proc) |
649 | { |
649 | { |
650 | aio_req req; |
650 | bdb_req req; |
651 | struct timespec ts; |
651 | struct timespec ts; |
652 | worker *self = (worker *)thr_arg; |
652 | worker *self = (worker *)thr_arg; |
653 | |
653 | |
654 | /* try to distribute timeouts somewhat evenly */ |
654 | /* try to distribute timeouts somewhat evenly */ |
655 | ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); |
655 | ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); |
… | |
… | |
873 | X_UNLOCK (wrklock); |
873 | X_UNLOCK (wrklock); |
874 | } |
874 | } |
875 | |
875 | |
876 | static void atfork_child (void) |
876 | static void atfork_child (void) |
877 | { |
877 | { |
878 | aio_req prv; |
878 | bdb_req prv; |
879 | |
879 | |
880 | while (prv = reqq_shift (&req_queue)) |
880 | while (prv = reqq_shift (&req_queue)) |
881 | req_free (prv); |
881 | req_free (prv); |
882 | |
882 | |
883 | while (prv = reqq_shift (&res_queue)) |
883 | while (prv = reqq_shift (&res_queue)) |
… | |
… | |
904 | |
904 | |
905 | atfork_parent (); |
905 | atfork_parent (); |
906 | } |
906 | } |
907 | |
907 | |
908 | #define dREQ(reqtype) \ |
908 | #define dREQ(reqtype) \ |
909 | aio_req req; \ |
909 | bdb_req req; \ |
910 | int req_pri = next_pri; \ |
910 | int req_pri = next_pri; \ |
911 | next_pri = DEFAULT_PRI + PRI_BIAS; \ |
911 | next_pri = DEFAULT_PRI + PRI_BIAS; \ |
912 | \ |
912 | \ |
913 | if (SvOK (callback) && !SvROK (callback)) \ |
913 | if (SvOK (callback) && !SvROK (callback)) \ |
914 | croak ("callback must be undef or of reference type"); \ |
914 | croak ("callback must be undef or of reference type"); \ |
915 | \ |
915 | \ |
916 | Newz (0, req, 1, aio_cb); \ |
916 | Newz (0, req, 1, bdb_cb); \ |
917 | if (!req) \ |
917 | if (!req) \ |
918 | croak ("out of memory during aio_req allocation"); \ |
918 | croak ("out of memory during bdb_req allocation"); \ |
919 | \ |
919 | \ |
920 | req->callback = newSVsv (callback); \ |
920 | req->callback = newSVsv (callback); \ |
921 | req->type = (reqtype); \ |
921 | req->type = (reqtype); \ |
922 | req->pri = req_pri |
922 | req->pri = req_pri |
923 | |
923 | |
… | |
… | |
1492 | } |
1492 | } |
1493 | |
1493 | |
1494 | void |
1494 | void |
1495 | db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) |
1495 | db_get (DB *db, DB_TXN_ornull *txn, SV *key, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) |
1496 | CODE: |
1496 | CODE: |
1497 | { |
|
|
1498 | if (SvREADONLY (data)) |
1497 | if (SvREADONLY (data)) |
1499 | croak ("can't modify read-only data scalar in db_get"); |
1498 | croak ("can't modify read-only data scalar in db_get"); |
1500 | |
1499 | { |
1501 | dREQ (REQ_DB_GET); |
1500 | dREQ (REQ_DB_GET); |
1502 | req->db = db; |
1501 | req->db = db; |
1503 | req->txn = txn; |
1502 | req->txn = txn; |
1504 | req->uint1 = flags; |
1503 | req->uint1 = flags; |
1505 | sv_to_dbt (&req->dbt1, key); |
1504 | sv_to_dbt (&req->dbt1, key); |
… | |
… | |
1509 | } |
1508 | } |
1510 | |
1509 | |
1511 | void |
1510 | void |
1512 | db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) |
1511 | db_pget (DB *db, DB_TXN_ornull *txn, SV *key, SV *pkey, SV *data, U32 flags = 0, SV *callback = &PL_sv_undef) |
1513 | CODE: |
1512 | CODE: |
1514 | { |
|
|
1515 | if (SvREADONLY (data)) |
1513 | if (SvREADONLY (data)) |
1516 | croak ("can't modify read-only data scalar in db_pget"); |
1514 | croak ("can't modify read-only data scalar in db_pget"); |
1517 | |
1515 | { |
1518 | dREQ (REQ_DB_PGET); |
1516 | dREQ (REQ_DB_PGET); |
1519 | req->db = db; |
1517 | req->db = db; |
1520 | req->txn = txn; |
1518 | req->txn = txn; |
1521 | req->uint1 = flags; |
1519 | req->uint1 = flags; |
1522 | sv_to_dbt (&req->dbt1, key); |
1520 | sv_to_dbt (&req->dbt1, key); |