ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/BDB/BDB.xs
Revision: 1.1
Committed: Mon Feb 5 18:40:55 2007 UTC (17 years, 3 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /* solaris */
2     #define _POSIX_PTHREAD_SEMANTICS 1
3    
4     #if __linux && !defined(_GNU_SOURCE)
5     # define _GNU_SOURCE
6     #endif
7    
8     /* just in case */
9     #define _REENTRANT 1
10    
11     #include <errno.h>
12    
13     #include "EXTERN.h"
14     #include "perl.h"
15     #include "XSUB.h"
16    
17     #include <pthread.h>
18    
19     #include <stddef.h>
20     #include <stdlib.h>
21     #include <errno.h>
22     #include <sys/time.h>
23     #include <sys/select.h>
24     #include <sys/types.h>
25     #include <sys/stat.h>
26     #include <limits.h>
27     #include <unistd.h>
28     #include <fcntl.h>
29     #include <signal.h>
30     #include <sched.h>
31    
32     /* number of seconds after which idle threads exit */
33     #define IDLE_TIMEOUT 10
34    
35     /* wether word reads are potentially non-atomic.
36     * this is conservatice, likely most arches this runs
37     * on have atomic word read/writes.
38     */
39     #ifndef WORDACCESS_UNSAFE
40     # if __i386 || __x86_64
41     # define WORDACCESS_UNSAFE 0
42     # else
43     # define WORDACCESS_UNSAFE 1
44     # endif
45     #endif
46    
47     typedef SV SV8; /* byte-sv, used for argument-checking */
48    
49     enum {
50     REQ_QUIT,
51     };
52    
53     #define AIO_CB \
54     struct aio_cb *volatile next; \
55     SV *callback; \
56     int type, pri
57    
58     typedef struct aio_cb
59     {
60     AIO_CB;
61     } aio_cb;
62    
63     typedef aio_cb *aio_req;
64    
65     enum {
66     PRI_MIN = -4,
67     PRI_MAX = 4,
68    
69     DEFAULT_PRI = 0,
70     PRI_BIAS = -PRI_MIN,
71     NUM_PRI = PRI_MAX + PRI_BIAS + 1,
72     };
73    
74     #define AIO_TICKS ((1000000 + 1023) >> 10)
75    
76     static unsigned int max_poll_time = 0;
77     static unsigned int max_poll_reqs = 0;
78    
79     /* calculcate time difference in ~1/AIO_TICKS of a second */
80     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
81     {
82     return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
83     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
84     }
85    
86     static int next_pri = DEFAULT_PRI + PRI_BIAS;
87    
88     static unsigned int started, idle, wanted;
89    
90     #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
91     # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
92     #else
93     # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
94     #endif
95    
96     #define LOCK(mutex) pthread_mutex_lock (&(mutex))
97     #define UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
98    
99     /* worker threads management */
100     static pthread_mutex_t wrklock = AIO_MUTEX_INIT;
101    
102     typedef struct worker {
103     /* locked by wrklock */
104     struct worker *prev, *next;
105    
106     pthread_t tid;
107    
108     /* locked by reslock, reqlock or wrklock */
109     aio_req req; /* currently processed request */
110     void *dbuf;
111     DIR *dirp;
112     } worker;
113    
114     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
115    
116     static void worker_clear (worker *wrk)
117     {
118     }
119    
120     static void worker_free (worker *wrk)
121     {
122     wrk->next->prev = wrk->prev;
123     wrk->prev->next = wrk->next;
124    
125     free (wrk);
126     }
127    
128     static volatile unsigned int nreqs, nready, npending;
129     static volatile unsigned int max_idle = 4;
130     static volatile unsigned int max_outstanding = 0xffffffff;
131     static int respipe [2];
132    
133     static pthread_mutex_t reslock = AIO_MUTEX_INIT;
134     static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
135     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
136    
137     #if WORDACCESS_UNSAFE
138    
139     static unsigned int get_nready ()
140     {
141     unsigned int retval;
142    
143     LOCK (reqlock);
144     retval = nready;
145     UNLOCK (reqlock);
146    
147     return retval;
148     }
149    
150     static unsigned int get_npending ()
151     {
152     unsigned int retval;
153    
154     LOCK (reslock);
155     retval = npending;
156     UNLOCK (reslock);
157    
158     return retval;
159     }
160    
161     static unsigned int get_nthreads ()
162     {
163     unsigned int retval;
164    
165     LOCK (wrklock);
166     retval = started;
167     UNLOCK (wrklock);
168    
169     return retval;
170     }
171    
172     #else
173    
174     # define get_nready() nready
175     # define get_npending() npending
176     # define get_nthreads() started
177    
178     #endif
179    
180     /*
181     * a somewhat faster data structure might be nice, but
182     * with 8 priorities this actually needs <20 insns
183     * per shift, the most expensive operation.
184     */
185     typedef struct {
186     aio_req qs[NUM_PRI], qe[NUM_PRI]; /* qstart, qend */
187     int size;
188     } reqq;
189    
190     static reqq req_queue;
191     static reqq res_queue;
192    
193     int reqq_push (reqq *q, aio_req req)
194     {
195     int pri = req->pri;
196     req->next = 0;
197    
198     if (q->qe[pri])
199     {
200     q->qe[pri]->next = req;
201     q->qe[pri] = req;
202     }
203     else
204     q->qe[pri] = q->qs[pri] = req;
205    
206     return q->size++;
207     }
208    
209     aio_req reqq_shift (reqq *q)
210     {
211     int pri;
212    
213     if (!q->size)
214     return 0;
215    
216     --q->size;
217    
218     for (pri = NUM_PRI; pri--; )
219     {
220     aio_req req = q->qs[pri];
221    
222     if (req)
223     {
224     if (!(q->qs[pri] = req->next))
225     q->qe[pri] = 0;
226    
227     return req;
228     }
229     }
230    
231     abort ();
232     }
233    
234     static int poll_cb ();
235     static int req_invoke (aio_req req);
236     static void req_free (aio_req req);
237     static void req_cancel (aio_req req);
238    
239     static int req_invoke (aio_req req)
240     {
241     dSP;
242    
243     if (SvOK (req->callback))
244     {
245     ENTER;
246     SAVETMPS;
247     PUSHMARK (SP);
248     EXTEND (SP, 1);
249    
250     switch (req->type)
251     {
252     }
253    
254     PUTBACK;
255     call_sv (req->callback, G_VOID | G_EVAL);
256     SPAGAIN;
257    
258     FREETMPS;
259     LEAVE;
260     }
261    
262     return !SvTRUE (ERRSV);
263     }
264    
265     static void req_free (aio_req req)
266     {
267     Safefree (req);
268     }
269    
270     static void *aio_proc(void *arg);
271    
272     static void start_thread (void)
273     {
274     sigset_t fullsigset, oldsigset;
275     pthread_attr_t attr;
276    
277     worker *wrk = calloc (1, sizeof (worker));
278    
279     if (!wrk)
280     croak ("unable to allocate worker thread data");
281    
282     pthread_attr_init (&attr);
283     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
284     #ifdef PTHREAD_SCOPE_PROCESS
285     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
286     #endif
287    
288     sigfillset (&fullsigset);
289    
290     LOCK (wrklock);
291     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
292    
293     if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
294     {
295     wrk->prev = &wrk_first;
296     wrk->next = wrk_first.next;
297     wrk_first.next->prev = wrk;
298     wrk_first.next = wrk;
299     ++started;
300     }
301     else
302     free (wrk);
303    
304     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
305     UNLOCK (wrklock);
306     }
307    
308     static void maybe_start_thread ()
309     {
310     if (get_nthreads () >= wanted)
311     return;
312    
313     /* todo: maybe use idle here, but might be less exact */
314     if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
315     return;
316    
317     start_thread ();
318     }
319    
320     static void req_send (aio_req req)
321     {
322     ++nreqs;
323    
324     LOCK (reqlock);
325     ++nready;
326     reqq_push (&req_queue, req);
327     pthread_cond_signal (&reqwait);
328     UNLOCK (reqlock);
329    
330     maybe_start_thread ();
331     }
332    
333     static void end_thread (void)
334     {
335     aio_req req;
336    
337     Newz (0, req, 1, aio_cb);
338    
339     req->type = REQ_QUIT;
340     req->pri = PRI_MAX + PRI_BIAS;
341    
342     LOCK (reqlock);
343     reqq_push (&req_queue, req);
344     pthread_cond_signal (&reqwait);
345     UNLOCK (reqlock);
346    
347     LOCK (wrklock);
348     --started;
349     UNLOCK (wrklock);
350     }
351    
352     static void set_max_idle (int nthreads)
353     {
354     if (WORDACCESS_UNSAFE) LOCK (reqlock);
355     max_idle = nthreads <= 0 ? 1 : nthreads;
356     if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
357     }
358    
359     static void min_parallel (int nthreads)
360     {
361     if (wanted < nthreads)
362     wanted = nthreads;
363     }
364    
365     static void max_parallel (int nthreads)
366     {
367     if (wanted > nthreads)
368     wanted = nthreads;
369    
370     while (started > wanted)
371     end_thread ();
372     }
373    
374     static void poll_wait ()
375     {
376     fd_set rfd;
377    
378     while (nreqs)
379     {
380     int size;
381     if (WORDACCESS_UNSAFE) LOCK (reslock);
382     size = res_queue.size;
383     if (WORDACCESS_UNSAFE) UNLOCK (reslock);
384    
385     if (size)
386     return;
387    
388     maybe_start_thread ();
389    
390     FD_ZERO(&rfd);
391     FD_SET(respipe [0], &rfd);
392    
393     select (respipe [0] + 1, &rfd, 0, 0, 0);
394     }
395     }
396    
397     static int poll_cb ()
398     {
399     dSP;
400     int count = 0;
401     int maxreqs = max_poll_reqs;
402     int do_croak = 0;
403     struct timeval tv_start, tv_now;
404     aio_req req;
405    
406     if (max_poll_time)
407     gettimeofday (&tv_start, 0);
408    
409     for (;;)
410     {
411     for (;;)
412     {
413     maybe_start_thread ();
414    
415     LOCK (reslock);
416     req = reqq_shift (&res_queue);
417    
418     if (req)
419     {
420     --npending;
421    
422     if (!res_queue.size)
423     {
424     /* read any signals sent by the worker threads */
425     char buf [4];
426     while (read (respipe [0], buf, 4) == 4)
427     ;
428     }
429     }
430    
431     UNLOCK (reslock);
432    
433     if (!req)
434     break;
435    
436     --nreqs;
437    
438     if (!req_invoke (req))
439     {
440     req_free (req);
441     croak (0);
442     }
443    
444     count++;
445    
446     req_free (req);
447    
448     if (maxreqs && !--maxreqs)
449     break;
450    
451     if (max_poll_time)
452     {
453     gettimeofday (&tv_now, 0);
454    
455     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
456     break;
457     }
458     }
459    
460     if (nreqs <= max_outstanding)
461     break;
462    
463     poll_wait ();
464    
465     ++maxreqs;
466     }
467    
468     return count;
469     }
470    
471     static void create_pipe ()
472     {
473     if (pipe (respipe))
474     croak ("unable to initialize result pipe");
475    
476     if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
477     croak ("cannot set result pipe to nonblocking mode");
478    
479     if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
480     croak ("cannot set result pipe to nonblocking mode");
481     }
482    
483     /*****************************************************************************/
484    
485     static void *aio_proc (void *thr_arg)
486     {
487     aio_req req;
488     struct timespec ts;
489     worker *self = (worker *)thr_arg;
490    
491     /* try to distribute timeouts somewhat evenly */
492     ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
493     * (1000000000UL / 1024UL);
494    
495     for (;;)
496     {
497     ts.tv_sec = time (0) + IDLE_TIMEOUT;
498    
499     LOCK (reqlock);
500    
501     for (;;)
502     {
503     self->req = req = reqq_shift (&req_queue);
504    
505     if (req)
506     break;
507    
508     ++idle;
509    
510     if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
511     == ETIMEDOUT)
512     {
513     if (idle > max_idle)
514     {
515     --idle;
516     UNLOCK (reqlock);
517     LOCK (wrklock);
518     --started;
519     UNLOCK (wrklock);
520     goto quit;
521     }
522    
523     /* we are allowed to idle, so do so without any timeout */
524     pthread_cond_wait (&reqwait, &reqlock);
525     ts.tv_sec = time (0) + IDLE_TIMEOUT;
526     }
527    
528     --idle;
529     }
530    
531     --nready;
532    
533     UNLOCK (reqlock);
534    
535     errno = 0; /* strictly unnecessary */
536    
537     switch (req->type)
538     {
539     case REQ_QUIT:
540     goto quit;
541    
542     default:
543     //req->result = ENOSYS;
544     break;
545     }
546    
547     //req->errorno = errno;
548    
549     LOCK (reslock);
550    
551     ++npending;
552    
553     if (!reqq_push (&res_queue, req))
554     /* write a dummy byte to the pipe so fh becomes ready */
555     write (respipe [1], &respipe, 1);
556    
557     self->req = 0;
558     worker_clear (self);
559    
560     UNLOCK (reslock);
561     }
562    
563     quit:
564     LOCK (wrklock);
565     worker_free (self);
566     UNLOCK (wrklock);
567    
568     return 0;
569     }
570    
571     /*****************************************************************************/
572    
573     static void atfork_prepare (void)
574     {
575     LOCK (wrklock);
576     LOCK (reqlock);
577     LOCK (reslock);
578     }
579    
580     static void atfork_parent (void)
581     {
582     UNLOCK (reslock);
583     UNLOCK (reqlock);
584     UNLOCK (wrklock);
585     }
586    
587     static void atfork_child (void)
588     {
589     aio_req prv;
590    
591     while (prv = reqq_shift (&req_queue))
592     req_free (prv);
593    
594     while (prv = reqq_shift (&res_queue))
595     req_free (prv);
596    
597     while (wrk_first.next != &wrk_first)
598     {
599     worker *wrk = wrk_first.next;
600    
601     if (wrk->req)
602     req_free (wrk->req);
603    
604     worker_clear (wrk);
605     worker_free (wrk);
606     }
607    
608     started = 0;
609     idle = 0;
610     nreqs = 0;
611     nready = 0;
612     npending = 0;
613    
614     close (respipe [0]);
615     close (respipe [1]);
616     create_pipe ();
617    
618     atfork_parent ();
619     }
620    
621     #define dREQ \
622     aio_req req; \
623     int req_pri = next_pri; \
624     next_pri = DEFAULT_PRI + PRI_BIAS; \
625     \
626     if (SvOK (callback) && !SvROK (callback)) \
627     croak ("callback must be undef or of reference type"); \
628     \
629     Newz (0, req, 1, aio_cb); \
630     if (!req) \
631     croak ("out of memory during aio_req allocation"); \
632     \
633     req->callback = newSVsv (callback); \
634     req->pri = req_pri
635    
636     #define REQ_SEND \
637     req_send (req); \
638     \
639     if (GIMME_V != G_VOID) \
640     XPUSHs (req_sv (req, AIO_REQ_KLASS));
641    
642     MODULE = BDB::AIO PACKAGE = BDB::AIO
643    
644     PROTOTYPES: ENABLE
645    
646     BOOT:
647     {
648     HV *stash = gv_stashpv ("BDB::AIO", 1);
649    
650     create_pipe ();
651     pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
652     }
653    
654     void
655     max_poll_reqs (int nreqs)
656     PROTOTYPE: $
657     CODE:
658     max_poll_reqs = nreqs;
659    
660     void
661     max_poll_time (double nseconds)
662     PROTOTYPE: $
663     CODE:
664     max_poll_time = nseconds * AIO_TICKS;
665    
666     void
667     min_parallel (int nthreads)
668     PROTOTYPE: $
669    
670     void
671     max_parallel (int nthreads)
672     PROTOTYPE: $
673    
674     void
675     max_idle (int nthreads)
676     PROTOTYPE: $
677     CODE:
678     set_max_idle (nthreads);
679    
680     int
681     max_outstanding (int maxreqs)
682     PROTOTYPE: $
683     CODE:
684     RETVAL = max_outstanding;
685     max_outstanding = maxreqs;
686     OUTPUT:
687     RETVAL
688    
689     int
690     bdbreq_pri (int pri = 0)
691     PROTOTYPE: ;$
692     CODE:
693     RETVAL = next_pri - PRI_BIAS;
694     if (items > 0)
695     {
696     if (pri < PRI_MIN) pri = PRI_MIN;
697     if (pri > PRI_MAX) pri = PRI_MAX;
698     next_pri = pri + PRI_BIAS;
699     }
700     OUTPUT:
701     RETVAL
702    
703     void
704     bdbreq_nice (int nice = 0)
705     CODE:
706     nice = next_pri - nice;
707     if (nice < PRI_MIN) nice = PRI_MIN;
708     if (nice > PRI_MAX) nice = PRI_MAX;
709     next_pri = nice + PRI_BIAS;
710    
711     void
712     flush ()
713     PROTOTYPE:
714     CODE:
715     while (nreqs)
716     {
717     poll_wait ();
718     poll_cb ();
719     }
720    
721     int
722     poll()
723     PROTOTYPE:
724     CODE:
725     poll_wait ();
726     RETVAL = poll_cb ();
727     OUTPUT:
728     RETVAL
729    
730     int
731     poll_fileno()
732     PROTOTYPE:
733     CODE:
734     RETVAL = respipe [0];
735     OUTPUT:
736     RETVAL
737    
738     int
739     poll_cb(...)
740     PROTOTYPE:
741     CODE:
742     RETVAL = poll_cb ();
743     OUTPUT:
744     RETVAL
745    
746     void
747     poll_wait()
748     PROTOTYPE:
749     CODE:
750     poll_wait ();
751    
752     int
753     nreqs()
754     PROTOTYPE:
755     CODE:
756     RETVAL = nreqs;
757     OUTPUT:
758     RETVAL
759    
760     int
761     nready()
762     PROTOTYPE:
763     CODE:
764     RETVAL = get_nready ();
765     OUTPUT:
766     RETVAL
767    
768     int
769     npending()
770     PROTOTYPE:
771     CODE:
772     RETVAL = get_npending ();
773     OUTPUT:
774     RETVAL
775    
776     int
777     nthreads()
778     PROTOTYPE:
779     CODE:
780     if (WORDACCESS_UNSAFE) LOCK (wrklock);
781     RETVAL = started;
782     if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
783     OUTPUT:
784     RETVAL
785    
786