ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.10
Committed: Tue May 13 17:08:15 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.9: +52 -45 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "eio.h"
2     #include "xthread.h"
3    
4     #include <errno.h>
5     #include <stddef.h>
6     #include <stdlib.h>
7 root 1.3 #include <string.h>
8 root 1.1 #include <errno.h>
9     #include <sys/types.h>
10     #include <sys/stat.h>
11     #include <limits.h>
12     #include <fcntl.h>
13 root 1.8 #include <assert.h>
14 root 1.1
15     #ifndef EIO_FINISH
16     # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17     #endif
18    
19     #ifndef EIO_DESTROY
20     # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21     #endif
22    
23     #ifndef EIO_FEED
24     # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25     #endif
26    
27     #ifdef _WIN32
28    
29     /*doh*/
30    
31     #else
32    
33     # include "config.h"
34     # include <sys/time.h>
35     # include <sys/select.h>
36     # include <unistd.h>
37     # include <utime.h>
38     # include <signal.h>
39 root 1.4 # include <dirent.h>
40 root 1.1
41     # ifndef EIO_STRUCT_DIRENT
42     # define EIO_STRUCT_DIRENT struct dirent
43     # endif
44    
45     #endif
46    
47     #if HAVE_SENDFILE
48     # if __linux
49     # include <sys/sendfile.h>
50     # elif __freebsd
51     # include <sys/socket.h>
52     # include <sys/uio.h>
53     # elif __hpux
54     # include <sys/socket.h>
55     # elif __solaris /* not yet */
56     # include <sys/sendfile.h>
57     # else
58     # error sendfile support requested but not available
59     # endif
60     #endif
61    
62     /* number of seconds after which an idle threads exit */
63     #define IDLE_TIMEOUT 10
64    
65     /* used for struct dirent, AIX doesn't provide it */
66     #ifndef NAME_MAX
67     # define NAME_MAX 4096
68     #endif
69    
70     /* buffer size for various temporary buffers */
71     #define EIO_BUFSIZE 65536
72    
73     #define dBUF \
74     char *eio_buf; \
75     X_LOCK (wrklock); \
76     self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77     X_UNLOCK (wrklock); \
78 root 1.9 errno = ENOMEM; \
79 root 1.1 if (!eio_buf) \
80     return -1;
81    
82     #define EIO_TICKS ((1000000 + 1023) >> 10)
83    
84     static void (*want_poll_cb) (void);
85     static void (*done_poll_cb) (void);
86    
87     static unsigned int max_poll_time = 0;
88     static unsigned int max_poll_reqs = 0;
89    
90     /* calculcate time difference in ~1/EIO_TICKS of a second */
91     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
92     {
93     return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
94     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
95     }
96    
97 root 1.9 static unsigned int started, idle, wanted = 4;
98 root 1.1
99     /* worker threads management */
100     static mutex_t wrklock = X_MUTEX_INIT;
101    
102 root 1.9 typedef struct worker
103     {
104 root 1.1 /* locked by wrklock */
105     struct worker *prev, *next;
106    
107     thread_t tid;
108    
109     /* locked by reslock, reqlock or wrklock */
110     eio_req *req; /* currently processed request */
111     void *dbuf;
112     DIR *dirp;
113     } worker;
114    
115     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
116    
117     static void worker_clear (worker *wrk)
118     {
119     if (wrk->dirp)
120     {
121     closedir (wrk->dirp);
122     wrk->dirp = 0;
123     }
124    
125     if (wrk->dbuf)
126     {
127     free (wrk->dbuf);
128     wrk->dbuf = 0;
129     }
130     }
131    
132     static void worker_free (worker *wrk)
133     {
134     wrk->next->prev = wrk->prev;
135     wrk->prev->next = wrk->next;
136    
137     free (wrk);
138     }
139    
140     static volatile unsigned int nreqs, nready, npending;
141     static volatile unsigned int max_idle = 4;
142    
143     static mutex_t reslock = X_MUTEX_INIT;
144     static mutex_t reqlock = X_MUTEX_INIT;
145     static cond_t reqwait = X_COND_INIT;
146    
147     unsigned int eio_nreqs (void)
148     {
149     return nreqs;
150     }
151    
152     unsigned int eio_nready (void)
153     {
154     unsigned int retval;
155    
156     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
157     retval = nready;
158     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
159    
160     return retval;
161     }
162    
163     unsigned int eio_npending (void)
164     {
165     unsigned int retval;
166    
167     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
168     retval = npending;
169     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
170    
171     return retval;
172     }
173    
174     unsigned int eio_nthreads (void)
175     {
176     unsigned int retval;
177    
178     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
179     retval = started;
180     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
181    
182     return retval;
183     }
184    
185     /*
186     * a somewhat faster data structure might be nice, but
187     * with 8 priorities this actually needs <20 insns
188     * per shift, the most expensive operation.
189     */
190     typedef struct {
191     eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
192     int size;
193     } reqq;
194    
195     static reqq req_queue;
196     static reqq res_queue;
197    
198     static int reqq_push (reqq *q, eio_req *req)
199     {
200     int pri = req->pri;
201     req->next = 0;
202    
203     if (q->qe[pri])
204     {
205     q->qe[pri]->next = req;
206     q->qe[pri] = req;
207     }
208     else
209     q->qe[pri] = q->qs[pri] = req;
210    
211     return q->size++;
212     }
213    
214     static eio_req *reqq_shift (reqq *q)
215     {
216     int pri;
217    
218     if (!q->size)
219     return 0;
220    
221     --q->size;
222    
223     for (pri = EIO_NUM_PRI; pri--; )
224     {
225     eio_req *req = q->qs[pri];
226    
227     if (req)
228     {
229     if (!(q->qs[pri] = (eio_req *)req->next))
230     q->qe[pri] = 0;
231    
232     return req;
233     }
234     }
235    
236     abort ();
237     }
238    
239 root 1.2 static void grp_try_feed (eio_req *grp)
240 root 1.1 {
241     while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
242     {
243     int old_len = grp->size;
244    
245     EIO_FEED (grp);
246    
247     /* stop if no progress has been made */
248     if (old_len == grp->size)
249     {
250     grp->feed = 0;
251 root 1.2 break;
252 root 1.1 }
253     }
254     }
255    
256 root 1.2 static int eio_finish (eio_req *req);
257 root 1.1
258     static int grp_dec (eio_req *grp)
259     {
260     --grp->size;
261    
262     /* call feeder, if applicable */
263 root 1.2 grp_try_feed (grp);
264 root 1.1
265     /* finish, if done */
266     if (!grp->size && grp->int1)
267 root 1.2 return eio_finish (grp);
268 root 1.1 else
269     return 0;
270     }
271    
272     void eio_destroy (eio_req *req)
273     {
274 root 1.6 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
275     if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
276 root 1.1
277     EIO_DESTROY (req);
278     }
279    
280 root 1.2 static int eio_finish (eio_req *req)
281 root 1.1 {
282     int res = EIO_FINISH (req);
283    
284     if (req->grp)
285     {
286     int res2;
287     eio_req *grp = req->grp;
288    
289     /* unlink request */
290     if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
291     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
292    
293     if (grp->grp_first == req)
294     grp->grp_first = req->grp_next;
295    
296     res2 = grp_dec (grp);
297    
298     if (!res && res2)
299     res = res2;
300     }
301    
302     eio_destroy (req);
303    
304     return res;
305     }
306    
307     void eio_grp_cancel (eio_req *grp)
308     {
309     for (grp = grp->grp_first; grp; grp = grp->grp_next)
310     eio_cancel (grp);
311     }
312    
313     void eio_cancel (eio_req *req)
314     {
315 root 1.6 X_LOCK (wrklock);
316 root 1.1 req->flags |= EIO_FLAG_CANCELLED;
317 root 1.6 X_UNLOCK (wrklock);
318 root 1.1
319     eio_grp_cancel (req);
320     }
321    
322     X_THREAD_PROC (eio_proc);
323    
324     static void start_thread (void)
325     {
326     worker *wrk = calloc (1, sizeof (worker));
327    
328 root 1.8 /*TODO*/
329 root 1.9 assert (("unable to allocate worker thread data", wrk));
330 root 1.1
331     X_LOCK (wrklock);
332    
333     if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
334     {
335     wrk->prev = &wrk_first;
336     wrk->next = wrk_first.next;
337     wrk_first.next->prev = wrk;
338     wrk_first.next = wrk;
339     ++started;
340     }
341     else
342     free (wrk);
343    
344     X_UNLOCK (wrklock);
345     }
346    
347     static void maybe_start_thread (void)
348     {
349     if (eio_nthreads () >= wanted)
350     return;
351    
352     /* todo: maybe use idle here, but might be less exact */
353     if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
354     return;
355    
356     start_thread ();
357     }
358    
359     void eio_submit (eio_req *req)
360     {
361     ++nreqs;
362    
363     X_LOCK (reqlock);
364     ++nready;
365     reqq_push (&req_queue, req);
366     X_COND_SIGNAL (reqwait);
367     X_UNLOCK (reqlock);
368    
369     maybe_start_thread ();
370     }
371    
372     static void end_thread (void)
373     {
374     eio_req *req = calloc (1, sizeof (eio_req));
375    
376     req->type = EIO_QUIT;
377     req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
378    
379     X_LOCK (reqlock);
380     reqq_push (&req_queue, req);
381     X_COND_SIGNAL (reqwait);
382     X_UNLOCK (reqlock);
383    
384     X_LOCK (wrklock);
385     --started;
386     X_UNLOCK (wrklock);
387     }
388    
389     void eio_set_max_poll_time (double nseconds)
390     {
391     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
392     max_poll_time = nseconds;
393     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
394     }
395    
396     void eio_set_max_poll_reqs (unsigned int maxreqs)
397     {
398     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
399     max_poll_reqs = maxreqs;
400     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
401     }
402    
403     void eio_set_max_idle (unsigned int nthreads)
404     {
405     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
406     max_idle = nthreads <= 0 ? 1 : nthreads;
407     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
408     }
409    
410     void eio_set_min_parallel (unsigned int nthreads)
411     {
412     if (wanted < nthreads)
413     wanted = nthreads;
414     }
415    
416     void eio_set_max_parallel (unsigned int nthreads)
417     {
418     if (wanted > nthreads)
419     wanted = nthreads;
420    
421     while (started > wanted)
422     end_thread ();
423     }
424    
425     int eio_poll (void)
426     {
427     int maxreqs = max_poll_reqs;
428     struct timeval tv_start, tv_now;
429     eio_req *req;
430    
431     if (max_poll_time)
432     gettimeofday (&tv_start, 0);
433    
434     for (;;)
435     {
436     maybe_start_thread ();
437    
438     X_LOCK (reslock);
439     req = reqq_shift (&res_queue);
440    
441     if (req)
442     {
443     --npending;
444    
445 root 1.2 if (!res_queue.size && done_poll_cb)
446 root 1.1 done_poll_cb ();
447     }
448    
449     X_UNLOCK (reslock);
450    
451     if (!req)
452     return 0;
453    
454     --nreqs;
455    
456     if (req->type == EIO_GROUP && req->size)
457     {
458     req->int1 = 1; /* mark request as delayed */
459     continue;
460     }
461     else
462     {
463 root 1.2 int res = eio_finish (req);
464 root 1.1 if (res)
465     return res;
466     }
467    
468     if (maxreqs && !--maxreqs)
469     break;
470    
471     if (max_poll_time)
472     {
473     gettimeofday (&tv_now, 0);
474    
475     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
476     break;
477     }
478     }
479    
480     errno = EAGAIN;
481     return -1;
482     }
483    
484     /*****************************************************************************/
485     /* work around various missing functions */
486    
487     #if !HAVE_PREADWRITE
488 root 1.9 # define pread eio__pread
489     # define pwrite eio__pwrite
490 root 1.1
491     /*
492     * make our pread/pwrite safe against themselves, but not against
493     * normal read/write by using a mutex. slows down execution a lot,
494     * but that's your problem, not mine.
495     */
496     static mutex_t preadwritelock = X_MUTEX_INIT;
497    
498 root 1.9 static ssize_t
499     eio__pread (int fd, void *buf, size_t count, off_t offset)
500 root 1.1 {
501     ssize_t res;
502     off_t ooffset;
503    
504     X_LOCK (preadwritelock);
505     ooffset = lseek (fd, 0, SEEK_CUR);
506     lseek (fd, offset, SEEK_SET);
507     res = read (fd, buf, count);
508     lseek (fd, ooffset, SEEK_SET);
509     X_UNLOCK (preadwritelock);
510    
511     return res;
512     }
513    
514 root 1.9 static ssize_t
515     eio__pwrite (int fd, void *buf, size_t count, off_t offset)
516 root 1.1 {
517     ssize_t res;
518     off_t ooffset;
519    
520     X_LOCK (preadwritelock);
521     ooffset = lseek (fd, 0, SEEK_CUR);
522     lseek (fd, offset, SEEK_SET);
523     res = write (fd, buf, count);
524     lseek (fd, offset, SEEK_SET);
525     X_UNLOCK (preadwritelock);
526    
527     return res;
528     }
529     #endif
530    
531     #ifndef HAVE_FUTIMES
532    
533 root 1.9 # define utimes(path,times) eio__utimes (path, times)
534     # define futimes(fd,times) eio__futimes (fd, times)
535 root 1.1
536 root 1.9 static int
537     eio__utimes (const char *filename, const struct timeval times[2])
538 root 1.1 {
539     if (times)
540     {
541     struct utimbuf buf;
542    
543     buf.actime = times[0].tv_sec;
544     buf.modtime = times[1].tv_sec;
545    
546     return utime (filename, &buf);
547     }
548     else
549     return utime (filename, 0);
550     }
551    
552 root 1.9 static int eio__futimes (int fd, const struct timeval tv[2])
553 root 1.1 {
554     errno = ENOSYS;
555     return -1;
556     }
557    
558     #endif
559    
560     #if !HAVE_FDATASYNC
561     # define fdatasync fsync
562     #endif
563    
564     #if !HAVE_READAHEAD
565 root 1.9 # define readahead(fd,offset,count) eio__readahead (fd, offset, count, self)
566 root 1.1
567 root 1.9 static ssize_t
568     eio__readahead (int fd, off_t offset, size_t count, worker *self)
569 root 1.1 {
570     size_t todo = count;
571     dBUF;
572    
573     while (todo > 0)
574     {
575     size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
576    
577 root 1.3 pread (fd, eio_buf, len, offset);
578 root 1.1 offset += len;
579     todo -= len;
580     }
581    
582     errno = 0;
583     return count;
584     }
585    
586     #endif
587    
588     #if !HAVE_READDIR_R
589 root 1.9 # define readdir_r eio__readdir_r
590 root 1.1
591     static mutex_t readdirlock = X_MUTEX_INIT;
592    
593 root 1.9 static int
594     eio__readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
595 root 1.1 {
596 root 1.5 EIO_STRUCT_DIRENT *e;
597 root 1.1 int errorno;
598    
599     X_LOCK (readdirlock);
600    
601     e = readdir (dirp);
602     errorno = errno;
603    
604     if (e)
605     {
606     *res = ent;
607     strcpy (ent->d_name, e->d_name);
608     }
609     else
610     *res = 0;
611    
612     X_UNLOCK (readdirlock);
613    
614     errno = errorno;
615     return e ? 0 : -1;
616     }
617     #endif
618    
619     /* sendfile always needs emulation */
620 root 1.9 static ssize_t
621     eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
622 root 1.1 {
623     ssize_t res;
624    
625     if (!count)
626     return 0;
627    
628     #if HAVE_SENDFILE
629     # if __linux
630     res = sendfile (ofd, ifd, &offset, count);
631    
632     # elif __freebsd
633     /*
634     * Of course, the freebsd sendfile is a dire hack with no thoughts
635     * wasted on making it similar to other I/O functions.
636     */
637     {
638     off_t sbytes;
639     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
640    
641     if (res < 0 && sbytes)
642     /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
643     res = sbytes;
644     }
645    
646     # elif __hpux
647     res = sendfile (ofd, ifd, offset, count, 0, 0);
648    
649     # elif __solaris
650     {
651     struct sendfilevec vec;
652     size_t sbytes;
653    
654     vec.sfv_fd = ifd;
655     vec.sfv_flag = 0;
656     vec.sfv_off = offset;
657     vec.sfv_len = count;
658    
659     res = sendfilev (ofd, &vec, 1, &sbytes);
660    
661     if (res < 0 && sbytes)
662     res = sbytes;
663     }
664    
665     # endif
666     #else
667     res = -1;
668     errno = ENOSYS;
669     #endif
670    
671     if (res < 0
672     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
673     #if __solaris
674     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
675     #endif
676     )
677     )
678     {
679     /* emulate sendfile. this is a major pain in the ass */
680     dBUF;
681    
682     res = 0;
683    
684     while (count)
685     {
686     ssize_t cnt;
687    
688     cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
689    
690     if (cnt <= 0)
691     {
692     if (cnt && !res) res = -1;
693     break;
694     }
695    
696     cnt = write (ofd, eio_buf, cnt);
697    
698     if (cnt <= 0)
699     {
700     if (cnt && !res) res = -1;
701     break;
702     }
703    
704     offset += cnt;
705     res += cnt;
706     count -= cnt;
707     }
708     }
709    
710     return res;
711     }
712    
713     /* read a full directory */
714 root 1.9 static void
715     eio__scandir (eio_req *req, worker *self)
716 root 1.1 {
717     DIR *dirp;
718     union
719     {
720     EIO_STRUCT_DIRENT d;
721     char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
722     } *u;
723     EIO_STRUCT_DIRENT *entp;
724     char *name, *names;
725     int memlen = 4096;
726     int memofs = 0;
727     int res = 0;
728    
729     X_LOCK (wrklock);
730     self->dirp = dirp = opendir (req->ptr1);
731     self->dbuf = u = malloc (sizeof (*u));
732     req->flags |= EIO_FLAG_PTR2_FREE;
733     req->ptr2 = names = malloc (memlen);
734     X_UNLOCK (wrklock);
735    
736     if (dirp && u && names)
737     for (;;)
738     {
739     errno = 0;
740     readdir_r (dirp, &u->d, &entp);
741    
742     if (!entp)
743     break;
744    
745     name = entp->d_name;
746    
747     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
748     {
749     int len = strlen (name) + 1;
750    
751     res++;
752    
753     while (memofs + len > memlen)
754     {
755     memlen *= 2;
756     X_LOCK (wrklock);
757     req->ptr2 = names = realloc (names, memlen);
758     X_UNLOCK (wrklock);
759    
760     if (!names)
761     break;
762     }
763    
764     memcpy (names + memofs, name, len);
765     memofs += len;
766     }
767     }
768    
769     if (errno)
770     res = -1;
771    
772     req->result = res;
773     }
774    
775     /*****************************************************************************/
776    
777 root 1.6 #define ALLOC(len) \
778     if (!req->ptr2) \
779     { \
780 root 1.7 X_LOCK (wrklock); \
781     req->flags |= EIO_FLAG_PTR2_FREE; \
782     X_UNLOCK (wrklock); \
783     req->ptr2 = malloc (len); \
784     if (!req->ptr2) \
785     { \
786     errno = ENOMEM; \
787     req->result = -1; \
788     break; \
789     } \
790 root 1.6 }
791    
792 root 1.1 X_THREAD_PROC (eio_proc)
793     {
794     eio_req *req;
795     struct timespec ts;
796     worker *self = (worker *)thr_arg;
797    
798     /* try to distribute timeouts somewhat randomly */
799     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
800    
801     for (;;)
802     {
803     ts.tv_sec = time (0) + IDLE_TIMEOUT;
804    
805     X_LOCK (reqlock);
806    
807     for (;;)
808     {
809     self->req = req = reqq_shift (&req_queue);
810    
811     if (req)
812     break;
813    
814     ++idle;
815    
816     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
817     {
818     if (idle > max_idle)
819     {
820     --idle;
821     X_UNLOCK (reqlock);
822     X_LOCK (wrklock);
823     --started;
824     X_UNLOCK (wrklock);
825     goto quit;
826     }
827    
828     /* we are allowed to idle, so do so without any timeout */
829     X_COND_WAIT (reqwait, reqlock);
830     ts.tv_sec = time (0) + IDLE_TIMEOUT;
831     }
832    
833     --idle;
834     }
835    
836     --nready;
837    
838     X_UNLOCK (reqlock);
839    
840     errno = 0; /* strictly unnecessary */
841    
842     if (!EIO_CANCELLED (req))
843     switch (req->type)
844     {
845 root 1.7 case EIO_READ: ALLOC (req->size);
846     req->result = req->offs >= 0
847 root 1.1 ? pread (req->int1, req->ptr2, req->size, req->offs)
848     : read (req->int1, req->ptr2, req->size); break;
849     case EIO_WRITE: req->result = req->offs >= 0
850     ? pwrite (req->int1, req->ptr2, req->size, req->offs)
851     : write (req->int1, req->ptr2, req->size); break;
852    
853 root 1.9 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
854     case EIO_SENDFILE: req->result = eio__sendfile (req->int1, req->int2, req->offs, req->size, self); break;
855 root 1.1
856 root 1.6 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
857     req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
858     case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
859     req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
860     case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
861     req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
862 root 1.1
863     case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
864     case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
865     case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
866     case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
867     case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
868     case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
869    
870     case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
871     case EIO_CLOSE: req->result = close (req->int1); break;
872     case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
873     case EIO_UNLINK: req->result = unlink (req->ptr1); break;
874     case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
875     case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
876     case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
877     case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
878     case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
879     case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
880 root 1.7
881 root 1.6 case EIO_READLINK: ALLOC (NAME_MAX);
882     req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
883 root 1.1
884     case EIO_SYNC: req->result = 0; sync (); break;
885     case EIO_FSYNC: req->result = fsync (req->int1); break;
886     case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
887    
888 root 1.9 case EIO_READDIR: eio__scandir (req, self); break;
889 root 1.1
890     case EIO_BUSY:
891     #ifdef _WIN32
892     Sleep (req->nv1 * 1000.);
893     #else
894     {
895     struct timeval tv;
896    
897     tv.tv_sec = req->nv1;
898     tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
899    
900     req->result = select (0, 0, 0, 0, &tv);
901     }
902     #endif
903     break;
904    
905     case EIO_UTIME:
906     case EIO_FUTIME:
907     {
908     struct timeval tv[2];
909     struct timeval *times;
910    
911     if (req->nv1 != -1. || req->nv2 != -1.)
912     {
913     tv[0].tv_sec = req->nv1;
914     tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
915     tv[1].tv_sec = req->nv2;
916     tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
917    
918     times = tv;
919     }
920     else
921     times = 0;
922    
923    
924     req->result = req->type == EIO_FUTIME
925     ? futimes (req->int1, times)
926     : utimes (req->ptr1, times);
927     }
928    
929     case EIO_GROUP:
930     case EIO_NOP:
931 root 1.9 req->result = 0;
932 root 1.1 break;
933    
934     case EIO_QUIT:
935     goto quit;
936    
937     default:
938     req->result = -1;
939     break;
940     }
941    
942     req->errorno = errno;
943    
944     X_LOCK (reslock);
945    
946     ++npending;
947    
948 root 1.2 if (!reqq_push (&res_queue, req) && want_poll_cb)
949 root 1.1 want_poll_cb ();
950    
951     self->req = 0;
952     worker_clear (self);
953    
954     X_UNLOCK (reslock);
955     }
956    
957     quit:
958     X_LOCK (wrklock);
959     worker_free (self);
960     X_UNLOCK (wrklock);
961    
962     return 0;
963     }
964    
965     /*****************************************************************************/
966    
967 root 1.2 static void eio_atfork_prepare (void)
968 root 1.1 {
969     X_LOCK (wrklock);
970     X_LOCK (reqlock);
971     X_LOCK (reslock);
972     #if !HAVE_PREADWRITE
973     X_LOCK (preadwritelock);
974     #endif
975     #if !HAVE_READDIR_R
976     X_LOCK (readdirlock);
977     #endif
978     }
979    
980 root 1.2 static void eio_atfork_parent (void)
981 root 1.1 {
982     #if !HAVE_READDIR_R
983     X_UNLOCK (readdirlock);
984     #endif
985     #if !HAVE_PREADWRITE
986     X_UNLOCK (preadwritelock);
987     #endif
988     X_UNLOCK (reslock);
989     X_UNLOCK (reqlock);
990     X_UNLOCK (wrklock);
991     }
992    
993 root 1.2 static void eio_atfork_child (void)
994 root 1.1 {
995     eio_req *prv;
996    
997     while (prv = reqq_shift (&req_queue))
998     eio_destroy (prv);
999    
1000     while (prv = reqq_shift (&res_queue))
1001     eio_destroy (prv);
1002    
1003     while (wrk_first.next != &wrk_first)
1004     {
1005     worker *wrk = wrk_first.next;
1006    
1007     if (wrk->req)
1008     eio_destroy (wrk->req);
1009    
1010     worker_clear (wrk);
1011     worker_free (wrk);
1012     }
1013    
1014     started = 0;
1015     idle = 0;
1016     nreqs = 0;
1017     nready = 0;
1018     npending = 0;
1019    
1020 root 1.2 eio_atfork_parent ();
1021 root 1.1 }
1022    
1023     int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1024     {
1025     want_poll_cb = want_poll;
1026     done_poll_cb = done_poll;
1027    
1028     #ifdef _WIN32
1029     X_MUTEX_CHECK (wrklock);
1030     X_MUTEX_CHECK (reslock);
1031     X_MUTEX_CHECK (reqlock);
1032     X_MUTEX_CHECK (reqwait);
1033     X_MUTEX_CHECK (preadwritelock);
1034     X_MUTEX_CHECK (readdirlock);
1035    
1036     X_COND_CHECK (reqwait);
1037     #endif
1038    
1039 root 1.2 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1040 root 1.1 }
1041    
1042 root 1.6 static void eio_api_destroy (eio_req *req)
1043     {
1044     free (req);
1045     }
1046    
1047     #define REQ(rtype) \
1048     eio_req *req; \
1049     \
1050     req = (eio_req *)calloc (1, sizeof *req); \
1051     if (!req) \
1052     return 0; \
1053     \
1054 root 1.9 req->type = rtype; \
1055 root 1.6 req->pri = EIO_DEFAULT_PRI + EIO_PRI_BIAS; \
1056     req->finish = cb; \
1057     req->destroy = eio_api_destroy;
1058    
1059     #define SEND eio_submit (req); return req
1060    
1061 root 1.9 #define PATH \
1062     req->flags |= EIO_FLAG_PTR1_FREE; \
1063     req->ptr1 = strdup (path); \
1064     if (!req->ptr1) \
1065     { \
1066     eio_api_destroy (req); \
1067     return 0; \
1068     }
1069    
1070 root 1.10 eio_req *eio_nop (int pri, eio_cb cb, void *data)
1071 root 1.9 {
1072     REQ (EIO_NOP); SEND;
1073     }
1074    
1075 root 1.10 eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data)
1076 root 1.9 {
1077     REQ (EIO_BUSY); req->nv1 = delay; SEND;
1078     }
1079    
1080 root 1.10 eio_req *eio_sync (int pri, eio_cb cb, void *data)
1081 root 1.9 {
1082     REQ (EIO_SYNC); SEND;
1083     }
1084 root 1.6
1085 root 1.10 eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data)
1086 root 1.6 {
1087 root 1.9 REQ (EIO_FSYNC); req->int1 = fd; SEND;
1088 root 1.6 }
1089    
1090 root 1.10 eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data)
1091 root 1.6 {
1092 root 1.9 REQ (EIO_FDATASYNC); req->int1 = fd; SEND;
1093     }
1094    
1095 root 1.10 eio_req *eio_close (int fd, int pri, eio_cb cb, void *data)
1096 root 1.9 {
1097     REQ (EIO_CLOSE); req->int1 = fd; SEND;
1098 root 1.6 }
1099    
1100 root 1.10 eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data)
1101 root 1.6 {
1102 root 1.9 REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1103 root 1.6 }
1104    
1105 root 1.10 eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1106 root 1.6 {
1107 root 1.10 REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1108 root 1.6 }
1109    
1110 root 1.10 eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1111 root 1.6 {
1112 root 1.10 REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1113 root 1.6 }
1114    
1115 root 1.10 eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data)
1116 root 1.6 {
1117 root 1.9 REQ (EIO_FSTAT); req->int1 = fd; SEND;
1118 root 1.6 }
1119    
1120 root 1.10 eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data)
1121 root 1.6 {
1122 root 1.9 REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1123 root 1.6 }
1124    
1125 root 1.10 eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data)
1126 root 1.6 {
1127 root 1.9 REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1128 root 1.6 }
1129    
1130 root 1.10 eio_req *eio_fchmod (int fd, mode_t mode, int pri, eio_cb cb, void *data)
1131 root 1.6 {
1132 root 1.9 REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1133 root 1.6 }
1134    
1135 root 1.10 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1136 root 1.6 {
1137 root 1.9 REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1138 root 1.6 }
1139    
1140 root 1.10 eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data)
1141 root 1.6 {
1142 root 1.9 REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1143 root 1.6 }
1144    
1145 root 1.10 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data)
1146 root 1.6 {
1147 root 1.9 REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1148 root 1.6 }
1149    
1150 root 1.10 eio_req *eio_open (const char *path, int flags, mode_t mode, int pri, eio_cb cb, void *data)
1151 root 1.6 {
1152 root 1.9 REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1153 root 1.6 }
1154    
1155 root 1.10 eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data)
1156 root 1.6 {
1157 root 1.9 REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1158 root 1.6 }
1159    
1160 root 1.10 eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data)
1161 root 1.6 {
1162 root 1.9 REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND;
1163 root 1.6 }
1164    
1165 root 1.10 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1166 root 1.6 {
1167 root 1.9 REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1168 root 1.6 }
1169    
1170 root 1.10 eio_req *eio_chmod (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1171 root 1.6 {
1172 root 1.9 REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND;
1173 root 1.6 }
1174    
1175 root 1.10 eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1176 root 1.6 {
1177 root 1.9 REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
1178 root 1.6 }
1179    
1180 root 1.9 static eio_req *
1181 root 1.10 eio__1path (int type, const char *path, int pri, eio_cb cb, void *data)
1182 root 1.6 {
1183 root 1.9 REQ (type); PATH; SEND;
1184 root 1.6 }
1185    
1186 root 1.10 eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data)
1187 root 1.6 {
1188 root 1.10 return eio__1path (EIO_READLINK, path, pri, cb, data);
1189 root 1.6 }
1190    
1191 root 1.10 eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data)
1192 root 1.6 {
1193 root 1.10 return eio__1path (EIO_STAT, path, pri, cb, data);
1194 root 1.6 }
1195    
1196 root 1.10 eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data)
1197 root 1.6 {
1198 root 1.10 return eio__1path (EIO_LSTAT, path, pri, cb, data);
1199 root 1.6 }
1200    
1201 root 1.10 eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data)
1202 root 1.6 {
1203 root 1.10 return eio__1path (EIO_UNLINK, path, pri, cb, data);
1204 root 1.6 }
1205    
1206 root 1.10 eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data)
1207 root 1.6 {
1208 root 1.10 return eio__1path (EIO_RMDIR, path, pri, cb, data);
1209 root 1.6 }
1210    
1211 root 1.10 eio_req *eio_readdir (const char *path, int pri, eio_cb cb, void *data)
1212 root 1.1 {
1213 root 1.10 return eio__1path (EIO_READDIR, path, pri, cb, data);
1214 root 1.1 }
1215    
1216 root 1.10 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, int pri, eio_cb cb, void *data)
1217 root 1.1 {
1218 root 1.9 REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1219 root 1.1 }
1220    
1221 root 1.9 static eio_req *
1222 root 1.10 eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1223 root 1.1 {
1224 root 1.9 REQ (type); PATH;
1225 root 1.1
1226 root 1.9 req->flags |= EIO_FLAG_PTR2_FREE;
1227     req->ptr2 = strdup (new_path);
1228     if (!req->ptr2)
1229     {
1230     eio_api_destroy (req);
1231     return 0;
1232     }
1233 root 1.1
1234 root 1.9 SEND;
1235 root 1.1 }
1236    
1237 root 1.10 eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1238 root 1.1 {
1239 root 1.10 return eio__2path (EIO_LINK, path, new_path, pri, cb, data);
1240 root 1.1 }
1241    
1242 root 1.10 eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1243 root 1.1 {
1244 root 1.10 return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data);
1245 root 1.1 }
1246    
1247 root 1.10 eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1248 root 1.1 {
1249 root 1.10 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1250     }
1251    
1252     eio_req *eio_grp (eio_cb cb, void *data)
1253     {
1254     const int pri = EIO_PRI_MAX;
1255    
1256     REQ (EIO_GROUP); SEND;
1257 root 1.1 }
1258    
1259 root 1.9 #undef REQ
1260     #undef PATH
1261     #undef SEND
1262 root 1.1
1263 root 1.9 /*****************************************************************************/
1264     /* grp functions */
1265 root 1.1
1266 root 1.2 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1267 root 1.1 {
1268     grp->int2 = limit;
1269     grp->feed = feed;
1270 root 1.2
1271     grp_try_feed (grp);
1272     }
1273    
1274     void eio_grp_limit (eio_req *grp, int limit)
1275     {
1276     grp->int2 = limit;
1277    
1278     grp_try_feed (grp);
1279 root 1.1 }
1280    
1281     void eio_grp_add (eio_req *grp, eio_req *req)
1282     {
1283     assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1284    
1285     ++grp->size;
1286     req->grp = grp;
1287    
1288     req->grp_prev = 0;
1289     req->grp_next = grp->grp_first;
1290    
1291     if (grp->grp_first)
1292     grp->grp_first->grp_prev = req;
1293    
1294     grp->grp_first = req;
1295     }
1296    
1297 root 1.9 /*****************************************************************************/
1298     /* misc garbage */
1299    
1300     ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
1301     {
1302     worker wrk;
1303    
1304     wrk.dbuf = 0;
1305    
1306     eio__sendfile (ofd, ifd, offset, count, &wrk);
1307    
1308     if (wrk.dbuf)
1309     free (wrk.dbuf);
1310     }
1311 root 1.1