ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.11
Committed: Tue May 13 18:50:27 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.10: +9 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "eio.h"
2     #include "xthread.h"
3    
4     #include <errno.h>
5     #include <stddef.h>
6     #include <stdlib.h>
7 root 1.3 #include <string.h>
8 root 1.1 #include <errno.h>
9     #include <sys/types.h>
10     #include <sys/stat.h>
11     #include <limits.h>
12     #include <fcntl.h>
13 root 1.8 #include <assert.h>
14 root 1.1
15     #ifndef EIO_FINISH
16     # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17     #endif
18    
19     #ifndef EIO_DESTROY
20     # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21     #endif
22    
23     #ifndef EIO_FEED
24     # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25     #endif
26    
27     #ifdef _WIN32
28    
29     /*doh*/
30    
31     #else
32    
33     # include "config.h"
34     # include <sys/time.h>
35     # include <sys/select.h>
36     # include <unistd.h>
37     # include <utime.h>
38     # include <signal.h>
39 root 1.4 # include <dirent.h>
40 root 1.1
41     # ifndef EIO_STRUCT_DIRENT
42     # define EIO_STRUCT_DIRENT struct dirent
43     # endif
44    
45     #endif
46    
47     #if HAVE_SENDFILE
48     # if __linux
49     # include <sys/sendfile.h>
50     # elif __freebsd
51     # include <sys/socket.h>
52     # include <sys/uio.h>
53     # elif __hpux
54     # include <sys/socket.h>
55     # elif __solaris /* not yet */
56     # include <sys/sendfile.h>
57     # else
58     # error sendfile support requested but not available
59     # endif
60     #endif
61    
62     /* number of seconds after which an idle threads exit */
63     #define IDLE_TIMEOUT 10
64    
65     /* used for struct dirent, AIX doesn't provide it */
66     #ifndef NAME_MAX
67     # define NAME_MAX 4096
68     #endif
69    
70     /* buffer size for various temporary buffers */
71     #define EIO_BUFSIZE 65536
72    
73     #define dBUF \
74     char *eio_buf; \
75     X_LOCK (wrklock); \
76     self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77     X_UNLOCK (wrklock); \
78 root 1.9 errno = ENOMEM; \
79 root 1.1 if (!eio_buf) \
80     return -1;
81    
82     #define EIO_TICKS ((1000000 + 1023) >> 10)
83    
84     static void (*want_poll_cb) (void);
85     static void (*done_poll_cb) (void);
86    
87     static unsigned int max_poll_time = 0;
88     static unsigned int max_poll_reqs = 0;
89    
90     /* calculcate time difference in ~1/EIO_TICKS of a second */
91     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
92     {
93     return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
94     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
95     }
96    
97 root 1.9 static unsigned int started, idle, wanted = 4;
98 root 1.1
99     /* worker threads management */
100     static mutex_t wrklock = X_MUTEX_INIT;
101    
102 root 1.9 typedef struct worker
103     {
104 root 1.1 /* locked by wrklock */
105     struct worker *prev, *next;
106    
107     thread_t tid;
108    
109     /* locked by reslock, reqlock or wrklock */
110     eio_req *req; /* currently processed request */
111     void *dbuf;
112     DIR *dirp;
113     } worker;
114    
115     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
116    
117     static void worker_clear (worker *wrk)
118     {
119     if (wrk->dirp)
120     {
121     closedir (wrk->dirp);
122     wrk->dirp = 0;
123     }
124    
125     if (wrk->dbuf)
126     {
127     free (wrk->dbuf);
128     wrk->dbuf = 0;
129     }
130     }
131    
132     static void worker_free (worker *wrk)
133     {
134     wrk->next->prev = wrk->prev;
135     wrk->prev->next = wrk->next;
136    
137     free (wrk);
138     }
139    
140     static volatile unsigned int nreqs, nready, npending;
141     static volatile unsigned int max_idle = 4;
142    
143     static mutex_t reslock = X_MUTEX_INIT;
144     static mutex_t reqlock = X_MUTEX_INIT;
145     static cond_t reqwait = X_COND_INIT;
146    
147     unsigned int eio_nreqs (void)
148     {
149     return nreqs;
150     }
151    
152     unsigned int eio_nready (void)
153     {
154     unsigned int retval;
155    
156     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
157     retval = nready;
158     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
159    
160     return retval;
161     }
162    
163     unsigned int eio_npending (void)
164     {
165     unsigned int retval;
166    
167     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
168     retval = npending;
169     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
170    
171     return retval;
172     }
173    
174     unsigned int eio_nthreads (void)
175     {
176     unsigned int retval;
177    
178     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
179     retval = started;
180     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
181    
182     return retval;
183     }
184    
185     /*
186     * a somewhat faster data structure might be nice, but
187     * with 8 priorities this actually needs <20 insns
188     * per shift, the most expensive operation.
189     */
190     typedef struct {
191     eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
192     int size;
193     } reqq;
194    
195     static reqq req_queue;
196     static reqq res_queue;
197    
198     static int reqq_push (reqq *q, eio_req *req)
199     {
200     int pri = req->pri;
201     req->next = 0;
202    
203     if (q->qe[pri])
204     {
205     q->qe[pri]->next = req;
206     q->qe[pri] = req;
207     }
208     else
209     q->qe[pri] = q->qs[pri] = req;
210    
211     return q->size++;
212     }
213    
214     static eio_req *reqq_shift (reqq *q)
215     {
216     int pri;
217    
218     if (!q->size)
219     return 0;
220    
221     --q->size;
222    
223     for (pri = EIO_NUM_PRI; pri--; )
224     {
225     eio_req *req = q->qs[pri];
226    
227     if (req)
228     {
229     if (!(q->qs[pri] = (eio_req *)req->next))
230     q->qe[pri] = 0;
231    
232     return req;
233     }
234     }
235    
236     abort ();
237     }
238    
239 root 1.2 static void grp_try_feed (eio_req *grp)
240 root 1.1 {
241     while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
242     {
243     int old_len = grp->size;
244    
245     EIO_FEED (grp);
246    
247     /* stop if no progress has been made */
248     if (old_len == grp->size)
249     {
250     grp->feed = 0;
251 root 1.2 break;
252 root 1.1 }
253     }
254     }
255    
256 root 1.2 static int eio_finish (eio_req *req);
257 root 1.1
258     static int grp_dec (eio_req *grp)
259     {
260     --grp->size;
261    
262     /* call feeder, if applicable */
263 root 1.2 grp_try_feed (grp);
264 root 1.1
265     /* finish, if done */
266     if (!grp->size && grp->int1)
267 root 1.2 return eio_finish (grp);
268 root 1.1 else
269     return 0;
270     }
271    
272     void eio_destroy (eio_req *req)
273     {
274 root 1.6 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
275     if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
276 root 1.1
277     EIO_DESTROY (req);
278     }
279    
280 root 1.2 static int eio_finish (eio_req *req)
281 root 1.1 {
282     int res = EIO_FINISH (req);
283    
284     if (req->grp)
285     {
286     int res2;
287     eio_req *grp = req->grp;
288    
289     /* unlink request */
290     if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
291     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
292    
293     if (grp->grp_first == req)
294     grp->grp_first = req->grp_next;
295    
296     res2 = grp_dec (grp);
297    
298     if (!res && res2)
299     res = res2;
300     }
301    
302     eio_destroy (req);
303    
304     return res;
305     }
306    
307     void eio_grp_cancel (eio_req *grp)
308     {
309     for (grp = grp->grp_first; grp; grp = grp->grp_next)
310     eio_cancel (grp);
311     }
312    
313     void eio_cancel (eio_req *req)
314     {
315 root 1.6 X_LOCK (wrklock);
316 root 1.1 req->flags |= EIO_FLAG_CANCELLED;
317 root 1.6 X_UNLOCK (wrklock);
318 root 1.1
319     eio_grp_cancel (req);
320     }
321    
322     X_THREAD_PROC (eio_proc);
323    
324     static void start_thread (void)
325     {
326     worker *wrk = calloc (1, sizeof (worker));
327    
328 root 1.8 /*TODO*/
329 root 1.9 assert (("unable to allocate worker thread data", wrk));
330 root 1.1
331     X_LOCK (wrklock);
332    
333     if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
334     {
335     wrk->prev = &wrk_first;
336     wrk->next = wrk_first.next;
337     wrk_first.next->prev = wrk;
338     wrk_first.next = wrk;
339     ++started;
340     }
341     else
342     free (wrk);
343    
344     X_UNLOCK (wrklock);
345     }
346    
347     static void maybe_start_thread (void)
348     {
349     if (eio_nthreads () >= wanted)
350     return;
351    
352     /* todo: maybe use idle here, but might be less exact */
353     if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
354     return;
355    
356     start_thread ();
357     }
358    
359     void eio_submit (eio_req *req)
360     {
361 root 1.11 req->pri += EIO_PRI_BIAS;
362    
363     if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS;
364     if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
365    
366 root 1.1 ++nreqs;
367    
368     X_LOCK (reqlock);
369     ++nready;
370     reqq_push (&req_queue, req);
371     X_COND_SIGNAL (reqwait);
372     X_UNLOCK (reqlock);
373    
374     maybe_start_thread ();
375     }
376    
377     static void end_thread (void)
378     {
379     eio_req *req = calloc (1, sizeof (eio_req));
380    
381     req->type = EIO_QUIT;
382     req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
383    
384     X_LOCK (reqlock);
385     reqq_push (&req_queue, req);
386     X_COND_SIGNAL (reqwait);
387     X_UNLOCK (reqlock);
388    
389     X_LOCK (wrklock);
390     --started;
391     X_UNLOCK (wrklock);
392     }
393    
394     void eio_set_max_poll_time (double nseconds)
395     {
396     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
397     max_poll_time = nseconds;
398     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
399     }
400    
401     void eio_set_max_poll_reqs (unsigned int maxreqs)
402     {
403     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
404     max_poll_reqs = maxreqs;
405     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
406     }
407    
408     void eio_set_max_idle (unsigned int nthreads)
409     {
410     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
411     max_idle = nthreads <= 0 ? 1 : nthreads;
412     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
413     }
414    
415     void eio_set_min_parallel (unsigned int nthreads)
416     {
417     if (wanted < nthreads)
418     wanted = nthreads;
419     }
420    
421     void eio_set_max_parallel (unsigned int nthreads)
422     {
423     if (wanted > nthreads)
424     wanted = nthreads;
425    
426     while (started > wanted)
427     end_thread ();
428     }
429    
430     int eio_poll (void)
431     {
432     int maxreqs = max_poll_reqs;
433     struct timeval tv_start, tv_now;
434     eio_req *req;
435    
436     if (max_poll_time)
437     gettimeofday (&tv_start, 0);
438    
439     for (;;)
440     {
441     maybe_start_thread ();
442    
443     X_LOCK (reslock);
444     req = reqq_shift (&res_queue);
445    
446     if (req)
447     {
448     --npending;
449    
450 root 1.2 if (!res_queue.size && done_poll_cb)
451 root 1.1 done_poll_cb ();
452     }
453    
454     X_UNLOCK (reslock);
455    
456     if (!req)
457     return 0;
458    
459     --nreqs;
460    
461     if (req->type == EIO_GROUP && req->size)
462     {
463     req->int1 = 1; /* mark request as delayed */
464     continue;
465     }
466     else
467     {
468 root 1.2 int res = eio_finish (req);
469 root 1.1 if (res)
470     return res;
471     }
472    
473     if (maxreqs && !--maxreqs)
474     break;
475    
476     if (max_poll_time)
477     {
478     gettimeofday (&tv_now, 0);
479    
480     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
481     break;
482     }
483     }
484    
485     errno = EAGAIN;
486     return -1;
487     }
488    
489     /*****************************************************************************/
490     /* work around various missing functions */
491    
492     #if !HAVE_PREADWRITE
493 root 1.9 # define pread eio__pread
494     # define pwrite eio__pwrite
495 root 1.1
496     /*
497     * make our pread/pwrite safe against themselves, but not against
498     * normal read/write by using a mutex. slows down execution a lot,
499     * but that's your problem, not mine.
500     */
501     static mutex_t preadwritelock = X_MUTEX_INIT;
502    
503 root 1.9 static ssize_t
504     eio__pread (int fd, void *buf, size_t count, off_t offset)
505 root 1.1 {
506     ssize_t res;
507     off_t ooffset;
508    
509     X_LOCK (preadwritelock);
510     ooffset = lseek (fd, 0, SEEK_CUR);
511     lseek (fd, offset, SEEK_SET);
512     res = read (fd, buf, count);
513     lseek (fd, ooffset, SEEK_SET);
514     X_UNLOCK (preadwritelock);
515    
516     return res;
517     }
518    
519 root 1.9 static ssize_t
520     eio__pwrite (int fd, void *buf, size_t count, off_t offset)
521 root 1.1 {
522     ssize_t res;
523     off_t ooffset;
524    
525     X_LOCK (preadwritelock);
526     ooffset = lseek (fd, 0, SEEK_CUR);
527     lseek (fd, offset, SEEK_SET);
528     res = write (fd, buf, count);
529     lseek (fd, offset, SEEK_SET);
530     X_UNLOCK (preadwritelock);
531    
532     return res;
533     }
534     #endif
535    
536     #ifndef HAVE_FUTIMES
537    
538 root 1.9 # define utimes(path,times) eio__utimes (path, times)
539     # define futimes(fd,times) eio__futimes (fd, times)
540 root 1.1
541 root 1.9 static int
542     eio__utimes (const char *filename, const struct timeval times[2])
543 root 1.1 {
544     if (times)
545     {
546     struct utimbuf buf;
547    
548     buf.actime = times[0].tv_sec;
549     buf.modtime = times[1].tv_sec;
550    
551     return utime (filename, &buf);
552     }
553     else
554     return utime (filename, 0);
555     }
556    
557 root 1.9 static int eio__futimes (int fd, const struct timeval tv[2])
558 root 1.1 {
559     errno = ENOSYS;
560     return -1;
561     }
562    
563     #endif
564    
565     #if !HAVE_FDATASYNC
566     # define fdatasync fsync
567     #endif
568    
569     #if !HAVE_READAHEAD
570 root 1.9 # define readahead(fd,offset,count) eio__readahead (fd, offset, count, self)
571 root 1.1
572 root 1.9 static ssize_t
573     eio__readahead (int fd, off_t offset, size_t count, worker *self)
574 root 1.1 {
575     size_t todo = count;
576     dBUF;
577    
578     while (todo > 0)
579     {
580     size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
581    
582 root 1.3 pread (fd, eio_buf, len, offset);
583 root 1.1 offset += len;
584     todo -= len;
585     }
586    
587     errno = 0;
588     return count;
589     }
590    
591     #endif
592    
593     #if !HAVE_READDIR_R
594 root 1.9 # define readdir_r eio__readdir_r
595 root 1.1
596     static mutex_t readdirlock = X_MUTEX_INIT;
597    
598 root 1.9 static int
599     eio__readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
600 root 1.1 {
601 root 1.5 EIO_STRUCT_DIRENT *e;
602 root 1.1 int errorno;
603    
604     X_LOCK (readdirlock);
605    
606     e = readdir (dirp);
607     errorno = errno;
608    
609     if (e)
610     {
611     *res = ent;
612     strcpy (ent->d_name, e->d_name);
613     }
614     else
615     *res = 0;
616    
617     X_UNLOCK (readdirlock);
618    
619     errno = errorno;
620     return e ? 0 : -1;
621     }
622     #endif
623    
624     /* sendfile always needs emulation */
625 root 1.9 static ssize_t
626     eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
627 root 1.1 {
628     ssize_t res;
629    
630     if (!count)
631     return 0;
632    
633     #if HAVE_SENDFILE
634     # if __linux
635     res = sendfile (ofd, ifd, &offset, count);
636    
637     # elif __freebsd
638     /*
639     * Of course, the freebsd sendfile is a dire hack with no thoughts
640     * wasted on making it similar to other I/O functions.
641     */
642     {
643     off_t sbytes;
644     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
645    
646     if (res < 0 && sbytes)
647     /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
648     res = sbytes;
649     }
650    
651     # elif __hpux
652     res = sendfile (ofd, ifd, offset, count, 0, 0);
653    
654     # elif __solaris
655     {
656     struct sendfilevec vec;
657     size_t sbytes;
658    
659     vec.sfv_fd = ifd;
660     vec.sfv_flag = 0;
661     vec.sfv_off = offset;
662     vec.sfv_len = count;
663    
664     res = sendfilev (ofd, &vec, 1, &sbytes);
665    
666     if (res < 0 && sbytes)
667     res = sbytes;
668     }
669    
670     # endif
671     #else
672     res = -1;
673     errno = ENOSYS;
674     #endif
675    
676     if (res < 0
677     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
678     #if __solaris
679     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
680     #endif
681     )
682     )
683     {
684     /* emulate sendfile. this is a major pain in the ass */
685     dBUF;
686    
687     res = 0;
688    
689     while (count)
690     {
691     ssize_t cnt;
692    
693     cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
694    
695     if (cnt <= 0)
696     {
697     if (cnt && !res) res = -1;
698     break;
699     }
700    
701     cnt = write (ofd, eio_buf, cnt);
702    
703     if (cnt <= 0)
704     {
705     if (cnt && !res) res = -1;
706     break;
707     }
708    
709     offset += cnt;
710     res += cnt;
711     count -= cnt;
712     }
713     }
714    
715     return res;
716     }
717    
718     /* read a full directory */
719 root 1.9 static void
720     eio__scandir (eio_req *req, worker *self)
721 root 1.1 {
722     DIR *dirp;
723     union
724     {
725     EIO_STRUCT_DIRENT d;
726     char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
727     } *u;
728     EIO_STRUCT_DIRENT *entp;
729     char *name, *names;
730     int memlen = 4096;
731     int memofs = 0;
732     int res = 0;
733    
734     X_LOCK (wrklock);
735     self->dirp = dirp = opendir (req->ptr1);
736     self->dbuf = u = malloc (sizeof (*u));
737     req->flags |= EIO_FLAG_PTR2_FREE;
738     req->ptr2 = names = malloc (memlen);
739     X_UNLOCK (wrklock);
740    
741     if (dirp && u && names)
742     for (;;)
743     {
744     errno = 0;
745     readdir_r (dirp, &u->d, &entp);
746    
747     if (!entp)
748     break;
749    
750     name = entp->d_name;
751    
752     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
753     {
754     int len = strlen (name) + 1;
755    
756     res++;
757    
758     while (memofs + len > memlen)
759     {
760     memlen *= 2;
761     X_LOCK (wrklock);
762     req->ptr2 = names = realloc (names, memlen);
763     X_UNLOCK (wrklock);
764    
765     if (!names)
766     break;
767     }
768    
769     memcpy (names + memofs, name, len);
770     memofs += len;
771     }
772     }
773    
774     if (errno)
775     res = -1;
776    
777     req->result = res;
778     }
779    
780     /*****************************************************************************/
781    
782 root 1.6 #define ALLOC(len) \
783     if (!req->ptr2) \
784     { \
785 root 1.7 X_LOCK (wrklock); \
786     req->flags |= EIO_FLAG_PTR2_FREE; \
787     X_UNLOCK (wrklock); \
788     req->ptr2 = malloc (len); \
789     if (!req->ptr2) \
790     { \
791     errno = ENOMEM; \
792     req->result = -1; \
793     break; \
794     } \
795 root 1.6 }
796    
797 root 1.1 X_THREAD_PROC (eio_proc)
798     {
799     eio_req *req;
800     struct timespec ts;
801     worker *self = (worker *)thr_arg;
802    
803     /* try to distribute timeouts somewhat randomly */
804     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
805    
806     for (;;)
807     {
808     ts.tv_sec = time (0) + IDLE_TIMEOUT;
809    
810     X_LOCK (reqlock);
811    
812     for (;;)
813     {
814     self->req = req = reqq_shift (&req_queue);
815    
816     if (req)
817     break;
818    
819     ++idle;
820    
821     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
822     {
823     if (idle > max_idle)
824     {
825     --idle;
826     X_UNLOCK (reqlock);
827     X_LOCK (wrklock);
828     --started;
829     X_UNLOCK (wrklock);
830     goto quit;
831     }
832    
833     /* we are allowed to idle, so do so without any timeout */
834     X_COND_WAIT (reqwait, reqlock);
835     ts.tv_sec = time (0) + IDLE_TIMEOUT;
836     }
837    
838     --idle;
839     }
840    
841     --nready;
842    
843     X_UNLOCK (reqlock);
844    
845     errno = 0; /* strictly unnecessary */
846    
847     if (!EIO_CANCELLED (req))
848     switch (req->type)
849     {
850 root 1.7 case EIO_READ: ALLOC (req->size);
851     req->result = req->offs >= 0
852 root 1.1 ? pread (req->int1, req->ptr2, req->size, req->offs)
853     : read (req->int1, req->ptr2, req->size); break;
854     case EIO_WRITE: req->result = req->offs >= 0
855     ? pwrite (req->int1, req->ptr2, req->size, req->offs)
856     : write (req->int1, req->ptr2, req->size); break;
857    
858 root 1.9 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
859     case EIO_SENDFILE: req->result = eio__sendfile (req->int1, req->int2, req->offs, req->size, self); break;
860 root 1.1
861 root 1.6 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
862     req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
863     case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
864     req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
865     case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
866     req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
867 root 1.1
868     case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
869     case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
870     case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
871     case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
872     case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
873     case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
874    
875     case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
876     case EIO_CLOSE: req->result = close (req->int1); break;
877     case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
878     case EIO_UNLINK: req->result = unlink (req->ptr1); break;
879     case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
880     case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
881     case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
882     case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
883     case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
884     case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
885 root 1.7
886 root 1.6 case EIO_READLINK: ALLOC (NAME_MAX);
887     req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
888 root 1.1
889     case EIO_SYNC: req->result = 0; sync (); break;
890     case EIO_FSYNC: req->result = fsync (req->int1); break;
891     case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
892    
893 root 1.9 case EIO_READDIR: eio__scandir (req, self); break;
894 root 1.1
895     case EIO_BUSY:
896     #ifdef _WIN32
897     Sleep (req->nv1 * 1000.);
898     #else
899     {
900     struct timeval tv;
901    
902     tv.tv_sec = req->nv1;
903     tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
904    
905     req->result = select (0, 0, 0, 0, &tv);
906     }
907     #endif
908     break;
909    
910     case EIO_UTIME:
911     case EIO_FUTIME:
912     {
913     struct timeval tv[2];
914     struct timeval *times;
915    
916     if (req->nv1 != -1. || req->nv2 != -1.)
917     {
918     tv[0].tv_sec = req->nv1;
919     tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
920     tv[1].tv_sec = req->nv2;
921     tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
922    
923     times = tv;
924     }
925     else
926     times = 0;
927    
928    
929     req->result = req->type == EIO_FUTIME
930     ? futimes (req->int1, times)
931     : utimes (req->ptr1, times);
932     }
933    
934     case EIO_GROUP:
935     case EIO_NOP:
936 root 1.9 req->result = 0;
937 root 1.1 break;
938    
939     case EIO_QUIT:
940     goto quit;
941    
942     default:
943     req->result = -1;
944     break;
945     }
946    
947     req->errorno = errno;
948    
949     X_LOCK (reslock);
950    
951     ++npending;
952    
953 root 1.2 if (!reqq_push (&res_queue, req) && want_poll_cb)
954 root 1.1 want_poll_cb ();
955    
956     self->req = 0;
957     worker_clear (self);
958    
959     X_UNLOCK (reslock);
960     }
961    
962     quit:
963     X_LOCK (wrklock);
964     worker_free (self);
965     X_UNLOCK (wrklock);
966    
967     return 0;
968     }
969    
970     /*****************************************************************************/
971    
972 root 1.2 static void eio_atfork_prepare (void)
973 root 1.1 {
974     X_LOCK (wrklock);
975     X_LOCK (reqlock);
976     X_LOCK (reslock);
977     #if !HAVE_PREADWRITE
978     X_LOCK (preadwritelock);
979     #endif
980     #if !HAVE_READDIR_R
981     X_LOCK (readdirlock);
982     #endif
983     }
984    
985 root 1.2 static void eio_atfork_parent (void)
986 root 1.1 {
987     #if !HAVE_READDIR_R
988     X_UNLOCK (readdirlock);
989     #endif
990     #if !HAVE_PREADWRITE
991     X_UNLOCK (preadwritelock);
992     #endif
993     X_UNLOCK (reslock);
994     X_UNLOCK (reqlock);
995     X_UNLOCK (wrklock);
996     }
997    
998 root 1.2 static void eio_atfork_child (void)
999 root 1.1 {
1000     eio_req *prv;
1001    
1002     while (prv = reqq_shift (&req_queue))
1003     eio_destroy (prv);
1004    
1005     while (prv = reqq_shift (&res_queue))
1006     eio_destroy (prv);
1007    
1008     while (wrk_first.next != &wrk_first)
1009     {
1010     worker *wrk = wrk_first.next;
1011    
1012     if (wrk->req)
1013     eio_destroy (wrk->req);
1014    
1015     worker_clear (wrk);
1016     worker_free (wrk);
1017     }
1018    
1019     started = 0;
1020     idle = 0;
1021     nreqs = 0;
1022     nready = 0;
1023     npending = 0;
1024    
1025 root 1.2 eio_atfork_parent ();
1026 root 1.1 }
1027    
1028     int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1029     {
1030     want_poll_cb = want_poll;
1031     done_poll_cb = done_poll;
1032    
1033     #ifdef _WIN32
1034     X_MUTEX_CHECK (wrklock);
1035     X_MUTEX_CHECK (reslock);
1036     X_MUTEX_CHECK (reqlock);
1037     X_MUTEX_CHECK (reqwait);
1038     X_MUTEX_CHECK (preadwritelock);
1039     X_MUTEX_CHECK (readdirlock);
1040    
1041     X_COND_CHECK (reqwait);
1042     #endif
1043    
1044 root 1.2 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1045 root 1.1 }
1046    
1047 root 1.6 static void eio_api_destroy (eio_req *req)
1048     {
1049     free (req);
1050     }
1051    
1052     #define REQ(rtype) \
1053     eio_req *req; \
1054     \
1055     req = (eio_req *)calloc (1, sizeof *req); \
1056     if (!req) \
1057     return 0; \
1058     \
1059 root 1.11 req->type = rtype; \
1060     req->pri = pri; \
1061     req->finish = cb; \
1062     req->data = data; \
1063 root 1.6 req->destroy = eio_api_destroy;
1064    
1065     #define SEND eio_submit (req); return req
1066    
1067 root 1.9 #define PATH \
1068     req->flags |= EIO_FLAG_PTR1_FREE; \
1069     req->ptr1 = strdup (path); \
1070     if (!req->ptr1) \
1071     { \
1072     eio_api_destroy (req); \
1073     return 0; \
1074     }
1075    
1076 root 1.10 eio_req *eio_nop (int pri, eio_cb cb, void *data)
1077 root 1.9 {
1078     REQ (EIO_NOP); SEND;
1079     }
1080    
1081 root 1.10 eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data)
1082 root 1.9 {
1083     REQ (EIO_BUSY); req->nv1 = delay; SEND;
1084     }
1085    
1086 root 1.10 eio_req *eio_sync (int pri, eio_cb cb, void *data)
1087 root 1.9 {
1088     REQ (EIO_SYNC); SEND;
1089     }
1090 root 1.6
1091 root 1.10 eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data)
1092 root 1.6 {
1093 root 1.9 REQ (EIO_FSYNC); req->int1 = fd; SEND;
1094 root 1.6 }
1095    
1096 root 1.10 eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data)
1097 root 1.6 {
1098 root 1.9 REQ (EIO_FDATASYNC); req->int1 = fd; SEND;
1099     }
1100    
1101 root 1.10 eio_req *eio_close (int fd, int pri, eio_cb cb, void *data)
1102 root 1.9 {
1103     REQ (EIO_CLOSE); req->int1 = fd; SEND;
1104 root 1.6 }
1105    
1106 root 1.10 eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data)
1107 root 1.6 {
1108 root 1.9 REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1109 root 1.6 }
1110    
1111 root 1.10 eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1112 root 1.6 {
1113 root 1.10 REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1114 root 1.6 }
1115    
1116 root 1.10 eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1117 root 1.6 {
1118 root 1.10 REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1119 root 1.6 }
1120    
1121 root 1.10 eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data)
1122 root 1.6 {
1123 root 1.9 REQ (EIO_FSTAT); req->int1 = fd; SEND;
1124 root 1.6 }
1125    
1126 root 1.10 eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data)
1127 root 1.6 {
1128 root 1.9 REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1129 root 1.6 }
1130    
1131 root 1.10 eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data)
1132 root 1.6 {
1133 root 1.9 REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1134 root 1.6 }
1135    
1136 root 1.10 eio_req *eio_fchmod (int fd, mode_t mode, int pri, eio_cb cb, void *data)
1137 root 1.6 {
1138 root 1.9 REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1139 root 1.6 }
1140    
1141 root 1.10 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1142 root 1.6 {
1143 root 1.9 REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1144 root 1.6 }
1145    
1146 root 1.10 eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data)
1147 root 1.6 {
1148 root 1.9 REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1149 root 1.6 }
1150    
1151 root 1.10 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data)
1152 root 1.6 {
1153 root 1.9 REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1154 root 1.6 }
1155    
1156 root 1.10 eio_req *eio_open (const char *path, int flags, mode_t mode, int pri, eio_cb cb, void *data)
1157 root 1.6 {
1158 root 1.9 REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1159 root 1.6 }
1160    
1161 root 1.10 eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data)
1162 root 1.6 {
1163 root 1.9 REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1164 root 1.6 }
1165    
1166 root 1.10 eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data)
1167 root 1.6 {
1168 root 1.9 REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND;
1169 root 1.6 }
1170    
1171 root 1.10 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1172 root 1.6 {
1173 root 1.9 REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1174 root 1.6 }
1175    
1176 root 1.10 eio_req *eio_chmod (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1177 root 1.6 {
1178 root 1.9 REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND;
1179 root 1.6 }
1180    
1181 root 1.10 eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1182 root 1.6 {
1183 root 1.9 REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
1184 root 1.6 }
1185    
1186 root 1.9 static eio_req *
1187 root 1.10 eio__1path (int type, const char *path, int pri, eio_cb cb, void *data)
1188 root 1.6 {
1189 root 1.9 REQ (type); PATH; SEND;
1190 root 1.6 }
1191    
1192 root 1.10 eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data)
1193 root 1.6 {
1194 root 1.10 return eio__1path (EIO_READLINK, path, pri, cb, data);
1195 root 1.6 }
1196    
1197 root 1.10 eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data)
1198 root 1.6 {
1199 root 1.10 return eio__1path (EIO_STAT, path, pri, cb, data);
1200 root 1.6 }
1201    
1202 root 1.10 eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data)
1203 root 1.6 {
1204 root 1.10 return eio__1path (EIO_LSTAT, path, pri, cb, data);
1205 root 1.6 }
1206    
1207 root 1.10 eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data)
1208 root 1.6 {
1209 root 1.10 return eio__1path (EIO_UNLINK, path, pri, cb, data);
1210 root 1.6 }
1211    
1212 root 1.10 eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data)
1213 root 1.6 {
1214 root 1.10 return eio__1path (EIO_RMDIR, path, pri, cb, data);
1215 root 1.6 }
1216    
1217 root 1.10 eio_req *eio_readdir (const char *path, int pri, eio_cb cb, void *data)
1218 root 1.1 {
1219 root 1.10 return eio__1path (EIO_READDIR, path, pri, cb, data);
1220 root 1.1 }
1221    
1222 root 1.10 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, int pri, eio_cb cb, void *data)
1223 root 1.1 {
1224 root 1.9 REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1225 root 1.1 }
1226    
1227 root 1.9 static eio_req *
1228 root 1.10 eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1229 root 1.1 {
1230 root 1.9 REQ (type); PATH;
1231 root 1.1
1232 root 1.9 req->flags |= EIO_FLAG_PTR2_FREE;
1233     req->ptr2 = strdup (new_path);
1234     if (!req->ptr2)
1235     {
1236     eio_api_destroy (req);
1237     return 0;
1238     }
1239 root 1.1
1240 root 1.9 SEND;
1241 root 1.1 }
1242    
1243 root 1.10 eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1244 root 1.1 {
1245 root 1.10 return eio__2path (EIO_LINK, path, new_path, pri, cb, data);
1246 root 1.1 }
1247    
1248 root 1.10 eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1249 root 1.1 {
1250 root 1.10 return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data);
1251 root 1.1 }
1252    
1253 root 1.10 eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1254 root 1.1 {
1255 root 1.10 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1256     }
1257    
1258     eio_req *eio_grp (eio_cb cb, void *data)
1259     {
1260     const int pri = EIO_PRI_MAX;
1261    
1262     REQ (EIO_GROUP); SEND;
1263 root 1.1 }
1264    
1265 root 1.9 #undef REQ
1266     #undef PATH
1267     #undef SEND
1268 root 1.1
1269 root 1.9 /*****************************************************************************/
1270     /* grp functions */
1271 root 1.1
1272 root 1.2 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1273 root 1.1 {
1274     grp->int2 = limit;
1275     grp->feed = feed;
1276 root 1.2
1277     grp_try_feed (grp);
1278     }
1279    
1280     void eio_grp_limit (eio_req *grp, int limit)
1281     {
1282     grp->int2 = limit;
1283    
1284     grp_try_feed (grp);
1285 root 1.1 }
1286    
1287     void eio_grp_add (eio_req *grp, eio_req *req)
1288     {
1289     assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1290    
1291     ++grp->size;
1292     req->grp = grp;
1293    
1294     req->grp_prev = 0;
1295     req->grp_next = grp->grp_first;
1296    
1297     if (grp->grp_first)
1298     grp->grp_first->grp_prev = req;
1299    
1300     grp->grp_first = req;
1301     }
1302    
1303 root 1.9 /*****************************************************************************/
1304     /* misc garbage */
1305    
1306     ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
1307     {
1308     worker wrk;
1309    
1310     wrk.dbuf = 0;
1311    
1312     eio__sendfile (ofd, ifd, offset, count, &wrk);
1313    
1314     if (wrk.dbuf)
1315     free (wrk.dbuf);
1316     }
1317 root 1.1