ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
Revision: 1.1
Committed: Mon Feb 5 18:40:55 2007 UTC (17 years, 3 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 /* solaris */
2 #define _POSIX_PTHREAD_SEMANTICS 1
3
4 #if __linux && !defined(_GNU_SOURCE)
5 # define _GNU_SOURCE
6 #endif
7
8 /* just in case */
9 #define _REENTRANT 1
10
11 #include <errno.h>
12
13 #include "EXTERN.h"
14 #include "perl.h"
15 #include "XSUB.h"
16
17 #include <pthread.h>
18
19 #include <stddef.h>
20 #include <stdlib.h>
21 #include <errno.h>
22 #include <sys/time.h>
23 #include <sys/select.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <limits.h>
27 #include <unistd.h>
28 #include <fcntl.h>
29 #include <signal.h>
30 #include <sched.h>
31
32 /* number of seconds after which idle threads exit */
33 #define IDLE_TIMEOUT 10
34
35 /* wether word reads are potentially non-atomic.
36 * this is conservatice, likely most arches this runs
37 * on have atomic word read/writes.
38 */
39 #ifndef WORDACCESS_UNSAFE
40 # if __i386 || __x86_64
41 # define WORDACCESS_UNSAFE 0
42 # else
43 # define WORDACCESS_UNSAFE 1
44 # endif
45 #endif
46
47 typedef SV SV8; /* byte-sv, used for argument-checking */
48
49 enum {
50 REQ_QUIT,
51 };
52
53 #define AIO_CB \
54 struct aio_cb *volatile next; \
55 SV *callback; \
56 int type, pri
57
58 typedef struct aio_cb
59 {
60 AIO_CB;
61 } aio_cb;
62
63 typedef aio_cb *aio_req;
64
65 enum {
66 PRI_MIN = -4,
67 PRI_MAX = 4,
68
69 DEFAULT_PRI = 0,
70 PRI_BIAS = -PRI_MIN,
71 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
72 };
73
74 #define AIO_TICKS ((1000000 + 1023) >> 10)
75
76 static unsigned int max_poll_time = 0;
77 static unsigned int max_poll_reqs = 0;
78
79 /* calculcate time difference in ~1/AIO_TICKS of a second */
80 static int tvdiff (struct timeval *tv1, struct timeval *tv2)
81 {
82 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
83 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
84 }
85
86 static int next_pri = DEFAULT_PRI + PRI_BIAS;
87
88 static unsigned int started, idle, wanted;
89
90 #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
91 # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
92 #else
93 # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
94 #endif
95
96 #define LOCK(mutex) pthread_mutex_lock (&(mutex))
97 #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
98
99 /* worker threads management */
100 static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
101
102 typedef struct worker {
103 /* locked by wrklock */
104 struct worker *prev, *next;
105
106 pthread_t tid;
107
108 /* locked by reslock, reqlock or wrklock */
109 aio_req req; /* currently processed request */
110 void *dbuf;
111 DIR *dirp;
112 } worker;
113
114 static worker wrk_first = { &wrk_first, &wrk_first, 0 };
115
116 static void worker_clear (worker *wrk)
117 {
118 }
119
120 static void worker_free (worker *wrk)
121 {
122 wrk->next->prev = wrk->prev;
123 wrk->prev->next = wrk->next;
124
125 free (wrk);
126 }
127
128 static volatile unsigned int nreqs, nready, npending;
129 static volatile unsigned int max_idle = 4;
130 static volatile unsigned int max_outstanding = 0xffffffff;
131 static int respipe [2];
132
133 static pthread_mutex_t reslock = AIO_MUTEX_INIT;
134 static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
135 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
136
137 #if WORDACCESS_UNSAFE
138
139 static unsigned int get_nready ()
140 {
141 unsigned int retval;
142
143 LOCK (reqlock);
144 retval = nready;
145 UNLOCK (reqlock);
146
147 return retval;
148 }
149
150 static unsigned int get_npending ()
151 {
152 unsigned int retval;
153
154 LOCK (reslock);
155 retval = npending;
156 UNLOCK (reslock);
157
158 return retval;
159 }
160
161 static unsigned int get_nthreads ()
162 {
163 unsigned int retval;
164
165 LOCK (wrklock);
166 retval = started;
167 UNLOCK (wrklock);
168
169 return retval;
170 }
171
172 #else
173
174 # define get_nready() nready
175 # define get_npending() npending
176 # define get_nthreads() started
177
178 #endif
179
180 /*
181 * a somewhat faster data structure might be nice, but
182 * with 8 priorities this actually needs <20 insns
183 * per shift, the most expensive operation.
184 */
185 typedef struct {
186 aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
187 int size;
188 } reqq;
189
190 static reqq req_queue;
191 static reqq res_queue;
192
193 int reqq_push (reqq *q, aio_req req)
194 {
195 int pri = req->pri;
196 req->next = 0;
197
198 if (q->qe[pri])
199 {
200 q->qe[pri]->next = req;
201 q->qe[pri] = req;
202 }
203 else
204 q->qe[pri] = q->qs[pri] = req;
205
206 return q->size++;
207 }
208
209 aio_req reqq_shift (reqq *q)
210 {
211 int pri;
212
213 if (!q->size)
214 return 0;
215
216 --q->size;
217
218 for (pri = NUM_PRI; pri--; )
219 {
220 aio_req req = q->qs[pri];
221
222 if (req)
223 {
224 if (!(q->qs[pri] = req->next))
225 q->qe[pri] = 0;
226
227 return req;
228 }
229 }
230
231 abort ();
232 }
233
234 static int poll_cb ();
235 static int req_invoke (aio_req req);
236 static void req_free (aio_req req);
237 static void req_cancel (aio_req req);
238
239 static int req_invoke (aio_req req)
240 {
241 dSP;
242
243 if (SvOK (req->callback))
244 {
245 ENTER;
246 SAVETMPS;
247 PUSHMARK (SP);
248 EXTEND (SP, 1);
249
250 switch (req->type)
251 {
252 }
253
254 PUTBACK;
255 call_sv (req->callback, G_VOID | G_EVAL);
256 SPAGAIN;
257
258 FREETMPS;
259 LEAVE;
260 }
261
262 return !SvTRUE (ERRSV);
263 }
264
265 static void req_free (aio_req req)
266 {
267 Safefree (req);
268 }
269
270 static void *aio_proc(void *arg);
271
272 static void start_thread (void)
273 {
274 sigset_t fullsigset, oldsigset;
275 pthread_attr_t attr;
276
277 worker *wrk = calloc (1, sizeof (worker));
278
279 if (!wrk)
280 croak ("unable to allocate worker thread data");
281
282 pthread_attr_init (&attr);
283 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
284 #ifdef PTHREAD_SCOPE_PROCESS
285 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
286 #endif
287
288 sigfillset (&fullsigset);
289
290 LOCK (wrklock);
291 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
292
293 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
294 {
295 wrk->prev = &wrk_first;
296 wrk->next = wrk_first.next;
297 wrk_first.next->prev = wrk;
298 wrk_first.next = wrk;
299 ++started;
300 }
301 else
302 free (wrk);
303
304 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
305 UNLOCK (wrklock);
306 }
307
308 static void maybe_start_thread ()
309 {
310 if (get_nthreads () >= wanted)
311 return;
312
313 /* todo: maybe use idle here, but might be less exact */
314 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
315 return;
316
317 start_thread ();
318 }
319
320 static void req_send (aio_req req)
321 {
322 ++nreqs;
323
324 LOCK (reqlock);
325 ++nready;
326 reqq_push (&req_queue, req);
327 pthread_cond_signal (&reqwait);
328 UNLOCK (reqlock);
329
330 maybe_start_thread ();
331 }
332
333 static void end_thread (void)
334 {
335 aio_req req;
336
337 Newz (0, req, 1, aio_cb);
338
339 req->type = REQ_QUIT;
340 req->pri = PRI_MAX + PRI_BIAS;
341
342 LOCK (reqlock);
343 reqq_push (&req_queue, req);
344 pthread_cond_signal (&reqwait);
345 UNLOCK (reqlock);
346
347 LOCK (wrklock);
348 --started;
349 UNLOCK (wrklock);
350 }
351
352 static void set_max_idle (int nthreads)
353 {
354 if (WORDACCESS_UNSAFE) LOCK (reqlock);
355 max_idle = nthreads <= 0 ? 1 : nthreads;
356 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
357 }
358
359 static void min_parallel (int nthreads)
360 {
361 if (wanted < nthreads)
362 wanted = nthreads;
363 }
364
365 static void max_parallel (int nthreads)
366 {
367 if (wanted > nthreads)
368 wanted = nthreads;
369
370 while (started > wanted)
371 end_thread ();
372 }
373
374 static void poll_wait ()
375 {
376 fd_set rfd;
377
378 while (nreqs)
379 {
380 int size;
381 if (WORDACCESS_UNSAFE) LOCK (reslock);
382 size = res_queue.size;
383 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
384
385 if (size)
386 return;
387
388 maybe_start_thread ();
389
390 FD_ZERO(&rfd);
391 FD_SET(respipe [0], &rfd);
392
393 select (respipe [0] + 1, &rfd, 0, 0, 0);
394 }
395 }
396
397 static int poll_cb ()
398 {
399 dSP;
400 int count = 0;
401 int maxreqs = max_poll_reqs;
402 int do_croak = 0;
403 struct timeval tv_start, tv_now;
404 aio_req req;
405
406 if (max_poll_time)
407 gettimeofday (&tv_start, 0);
408
409 for (;;)
410 {
411 for (;;)
412 {
413 maybe_start_thread ();
414
415 LOCK (reslock);
416 req = reqq_shift (&res_queue);
417
418 if (req)
419 {
420 --npending;
421
422 if (!res_queue.size)
423 {
424 /* read any signals sent by the worker threads */
425 char buf [4];
426 while (read (respipe [0], buf, 4) == 4)
427 ;
428 }
429 }
430
431 UNLOCK (reslock);
432
433 if (!req)
434 break;
435
436 --nreqs;
437
438 if (!req_invoke (req))
439 {
440 req_free (req);
441 croak (0);
442 }
443
444 count++;
445
446 req_free (req);
447
448 if (maxreqs && !--maxreqs)
449 break;
450
451 if (max_poll_time)
452 {
453 gettimeofday (&tv_now, 0);
454
455 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
456 break;
457 }
458 }
459
460 if (nreqs <= max_outstanding)
461 break;
462
463 poll_wait ();
464
465 ++maxreqs;
466 }
467
468 return count;
469 }
470
471 static void create_pipe ()
472 {
473 if (pipe (respipe))
474 croak ("unable to initialize result pipe");
475
476 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
477 croak ("cannot set result pipe to nonblocking mode");
478
479 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
480 croak ("cannot set result pipe to nonblocking mode");
481 }
482
483 /*****************************************************************************/
484
485 static void *aio_proc (void *thr_arg)
486 {
487 aio_req req;
488 struct timespec ts;
489 worker *self = (worker *)thr_arg;
490
491 /* try to distribute timeouts somewhat evenly */
492 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
493 * (1000000000UL / 1024UL);
494
495 for (;;)
496 {
497 ts.tv_sec = time (0) + IDLE_TIMEOUT;
498
499 LOCK (reqlock);
500
501 for (;;)
502 {
503 self->req = req = reqq_shift (&req_queue);
504
505 if (req)
506 break;
507
508 ++idle;
509
510 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
511 == ETIMEDOUT)
512 {
513 if (idle > max_idle)
514 {
515 --idle;
516 UNLOCK (reqlock);
517 LOCK (wrklock);
518 --started;
519 UNLOCK (wrklock);
520 goto quit;
521 }
522
523 /* we are allowed to idle, so do so without any timeout */
524 pthread_cond_wait (&reqwait, &reqlock);
525 ts.tv_sec = time (0) + IDLE_TIMEOUT;
526 }
527
528 --idle;
529 }
530
531 --nready;
532
533 UNLOCK (reqlock);
534
535 errno = 0; /* strictly unnecessary */
536
537 switch (req->type)
538 {
539 case REQ_QUIT:
540 goto quit;
541
542 default:
543 //req->result = ENOSYS;
544 break;
545 }
546
547 //req->errorno = errno;
548
549 LOCK (reslock);
550
551 ++npending;
552
553 if (!reqq_push (&res_queue, req))
554 /* write a dummy byte to the pipe so fh becomes ready */
555 write (respipe [1], &respipe, 1);
556
557 self->req = 0;
558 worker_clear (self);
559
560 UNLOCK (reslock);
561 }
562
563 quit:
564 LOCK (wrklock);
565 worker_free (self);
566 UNLOCK (wrklock);
567
568 return 0;
569 }
570
571 /*****************************************************************************/
572
573 static void atfork_prepare (void)
574 {
575 LOCK (wrklock);
576 LOCK (reqlock);
577 LOCK (reslock);
578 }
579
580 static void atfork_parent (void)
581 {
582 UNLOCK (reslock);
583 UNLOCK (reqlock);
584 UNLOCK (wrklock);
585 }
586
587 static void atfork_child (void)
588 {
589 aio_req prv;
590
591 while (prv = reqq_shift (&req_queue))
592 req_free (prv);
593
594 while (prv = reqq_shift (&res_queue))
595 req_free (prv);
596
597 while (wrk_first.next != &wrk_first)
598 {
599 worker *wrk = wrk_first.next;
600
601 if (wrk->req)
602 req_free (wrk->req);
603
604 worker_clear (wrk);
605 worker_free (wrk);
606 }
607
608 started = 0;
609 idle = 0;
610 nreqs = 0;
611 nready = 0;
612 npending = 0;
613
614 close (respipe [0]);
615 close (respipe [1]);
616 create_pipe ();
617
618 atfork_parent ();
619 }
620
621 #define dREQ \
622 aio_req req; \
623 int req_pri = next_pri; \
624 next_pri = DEFAULT_PRI + PRI_BIAS; \
625 \
626 if (SvOK (callback) && !SvROK (callback)) \
627 croak ("callback must be undef or of reference type"); \
628 \
629 Newz (0, req, 1, aio_cb); \
630 if (!req) \
631 croak ("out of memory during aio_req allocation"); \
632 \
633 req->callback = newSVsv (callback); \
634 req->pri = req_pri
635
636 #define REQ_SEND \
637 req_send (req); \
638 \
639 if (GIMME_V != G_VOID) \
640 XPUSHs (req_sv (req, AIO_REQ_KLASS));
641
642 MODULE = BDB::AIO PACKAGE = BDB::AIO
643
644 PROTOTYPES: ENABLE
645
646 BOOT:
647 {
648 HV *stash = gv_stashpv ("BDB::AIO", 1);
649
650 create_pipe ();
651 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
652 }
653
654 void
655 max_poll_reqs (int nreqs)
656 PROTOTYPE: $
657 CODE:
658 max_poll_reqs = nreqs;
659
660 void
661 max_poll_time (double nseconds)
662 PROTOTYPE: $
663 CODE:
664 max_poll_time = nseconds * AIO_TICKS;
665
666 void
667 min_parallel (int nthreads)
668 PROTOTYPE: $
669
670 void
671 max_parallel (int nthreads)
672 PROTOTYPE: $
673
674 void
675 max_idle (int nthreads)
676 PROTOTYPE: $
677 CODE:
678 set_max_idle (nthreads);
679
680 int
681 max_outstanding (int maxreqs)
682 PROTOTYPE: $
683 CODE:
684 RETVAL = max_outstanding;
685 max_outstanding = maxreqs;
686 OUTPUT:
687 RETVAL
688
689 int
690 bdbreq_pri (int pri = 0)
691 PROTOTYPE: ;$
692 CODE:
693 RETVAL = next_pri - PRI_BIAS;
694 if (items > 0)
695 {
696 if (pri < PRI_MIN) pri = PRI_MIN;
697 if (pri > PRI_MAX) pri = PRI_MAX;
698 next_pri = pri + PRI_BIAS;
699 }
700 OUTPUT:
701 RETVAL
702
703 void
704 bdbreq_nice (int nice = 0)
705 CODE:
706 nice = next_pri - nice;
707 if (nice < PRI_MIN) nice = PRI_MIN;
708 if (nice > PRI_MAX) nice = PRI_MAX;
709 next_pri = nice + PRI_BIAS;
710
711 void
712 flush ()
713 PROTOTYPE:
714 CODE:
715 while (nreqs)
716 {
717 poll_wait ();
718 poll_cb ();
719 }
720
721 int
722 poll()
723 PROTOTYPE:
724 CODE:
725 poll_wait ();
726 RETVAL = poll_cb ();
727 OUTPUT:
728 RETVAL
729
730 int
731 poll_fileno()
732 PROTOTYPE:
733 CODE:
734 RETVAL = respipe [0];
735 OUTPUT:
736 RETVAL
737
738 int
739 poll_cb(...)
740 PROTOTYPE:
741 CODE:
742 RETVAL = poll_cb ();
743 OUTPUT:
744 RETVAL
745
746 void
747 poll_wait()
748 PROTOTYPE:
749 CODE:
750 poll_wait ();
751
752 int
753 nreqs()
754 PROTOTYPE:
755 CODE:
756 RETVAL = nreqs;
757 OUTPUT:
758 RETVAL
759
760 int
761 nready()
762 PROTOTYPE:
763 CODE:
764 RETVAL = get_nready ();
765 OUTPUT:
766 RETVAL
767
768 int
769 npending()
770 PROTOTYPE:
771 CODE:
772 RETVAL = get_npending ();
773 OUTPUT:
774 RETVAL
775
776 int
777 nthreads()
778 PROTOTYPE:
779 CODE:
780 if (WORDACCESS_UNSAFE) LOCK (wrklock);
781 RETVAL = started;
782 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
783 OUTPUT:
784 RETVAL
785
786