ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.13
Committed: Tue May 13 19:34:11 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.12: +130 -116 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "eio.h"
2     #include "xthread.h"
3    
4     #include <errno.h>
5     #include <stddef.h>
6     #include <stdlib.h>
7 root 1.3 #include <string.h>
8 root 1.1 #include <errno.h>
9     #include <sys/types.h>
10     #include <sys/stat.h>
11     #include <limits.h>
12     #include <fcntl.h>
13 root 1.8 #include <assert.h>
14 root 1.1
15     #ifndef EIO_FINISH
16     # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17     #endif
18    
19     #ifndef EIO_DESTROY
20     # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21     #endif
22    
23     #ifndef EIO_FEED
24     # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25     #endif
26    
27     #ifdef _WIN32
28    
29     /*doh*/
30    
31     #else
32    
33     # include "config.h"
34     # include <sys/time.h>
35     # include <sys/select.h>
36     # include <unistd.h>
37     # include <utime.h>
38     # include <signal.h>
39 root 1.4 # include <dirent.h>
40 root 1.1
41     # ifndef EIO_STRUCT_DIRENT
42     # define EIO_STRUCT_DIRENT struct dirent
43     # endif
44    
45     #endif
46    
47     #if HAVE_SENDFILE
48     # if __linux
49     # include <sys/sendfile.h>
50     # elif __freebsd
51     # include <sys/socket.h>
52     # include <sys/uio.h>
53     # elif __hpux
54     # include <sys/socket.h>
55     # elif __solaris /* not yet */
56     # include <sys/sendfile.h>
57     # else
58     # error sendfile support requested but not available
59     # endif
60     #endif
61    
62     /* number of seconds after which an idle threads exit */
63     #define IDLE_TIMEOUT 10
64    
65     /* used for struct dirent, AIX doesn't provide it */
66     #ifndef NAME_MAX
67     # define NAME_MAX 4096
68     #endif
69    
70     /* buffer size for various temporary buffers */
71     #define EIO_BUFSIZE 65536
72    
73     #define dBUF \
74     char *eio_buf; \
75 root 1.13 X_LOCK (etplock); \
76 root 1.1 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77 root 1.13 X_UNLOCK (etplock); \
78 root 1.9 errno = ENOMEM; \
79 root 1.1 if (!eio_buf) \
80     return -1;
81    
82     #define EIO_TICKS ((1000000 + 1023) >> 10)
83    
84 root 1.13 /*****************************************************************************/
85 root 1.1
86     /* calculcate time difference in ~1/EIO_TICKS of a second */
87     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
88     {
89     return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
90     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
91     }
92    
93 root 1.9 static unsigned int started, idle, wanted = 4;
94 root 1.1
95 root 1.13 typedef struct etp_pool
96     {
97     void (*want_poll_cb) (void);
98     void (*done_poll_cb) (void);
99    
100     unsigned int max_poll_time;
101     unsigned int max_poll_reqs;
102     } etp_pool;
103    
104     static volatile unsigned int nreqs, nready, npending;
105     static volatile unsigned int max_idle = 4;
106    
107     static mutex_t etplock = X_MUTEX_INIT;
108     static mutex_t reslock = X_MUTEX_INIT;
109     static mutex_t reqlock = X_MUTEX_INIT;
110     static cond_t reqwait = X_COND_INIT;
111 root 1.1
112 root 1.9 typedef struct worker
113     {
114 root 1.13 /* locked by etplock */
115 root 1.1 struct worker *prev, *next;
116    
117     thread_t tid;
118    
119 root 1.13 /* locked by reslock, reqlock or etplock */
120 root 1.1 eio_req *req; /* currently processed request */
121     void *dbuf;
122     DIR *dirp;
123     } worker;
124    
125 root 1.13 static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
126    
127     /* worker threads management */
128 root 1.1
129     static void worker_clear (worker *wrk)
130     {
131     if (wrk->dirp)
132     {
133     closedir (wrk->dirp);
134     wrk->dirp = 0;
135     }
136    
137     if (wrk->dbuf)
138     {
139     free (wrk->dbuf);
140     wrk->dbuf = 0;
141     }
142     }
143    
144     static void worker_free (worker *wrk)
145     {
146     wrk->next->prev = wrk->prev;
147     wrk->prev->next = wrk->next;
148    
149     free (wrk);
150     }
151    
152     unsigned int eio_nreqs (void)
153     {
154     return nreqs;
155     }
156    
157     unsigned int eio_nready (void)
158     {
159     unsigned int retval;
160    
161     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
162     retval = nready;
163     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
164    
165     return retval;
166     }
167    
168     unsigned int eio_npending (void)
169     {
170     unsigned int retval;
171    
172     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
173     retval = npending;
174     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
175    
176     return retval;
177     }
178    
179     unsigned int eio_nthreads (void)
180     {
181     unsigned int retval;
182    
183     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
184     retval = started;
185     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
186    
187     return retval;
188     }
189    
190     /*
191     * a somewhat faster data structure might be nice, but
192     * with 8 priorities this actually needs <20 insns
193     * per shift, the most expensive operation.
194     */
195     typedef struct {
196     eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
197     int size;
198     } reqq;
199    
200     static reqq req_queue;
201     static reqq res_queue;
202    
203     static int reqq_push (reqq *q, eio_req *req)
204     {
205     int pri = req->pri;
206     req->next = 0;
207    
208     if (q->qe[pri])
209     {
210     q->qe[pri]->next = req;
211     q->qe[pri] = req;
212     }
213     else
214     q->qe[pri] = q->qs[pri] = req;
215    
216     return q->size++;
217     }
218    
219     static eio_req *reqq_shift (reqq *q)
220     {
221     int pri;
222    
223     if (!q->size)
224     return 0;
225    
226     --q->size;
227    
228     for (pri = EIO_NUM_PRI; pri--; )
229     {
230     eio_req *req = q->qs[pri];
231    
232     if (req)
233     {
234     if (!(q->qs[pri] = (eio_req *)req->next))
235     q->qe[pri] = 0;
236    
237     return req;
238     }
239     }
240    
241     abort ();
242     }
243    
244 root 1.13 static void etp_atfork_prepare (void)
245     {
246     X_LOCK (etplock);
247     X_LOCK (reqlock);
248     X_LOCK (reslock);
249     #if !HAVE_PREADWRITE
250     X_LOCK (preadwritelock);
251     #endif
252     #if !HAVE_READDIR_R
253     X_LOCK (readdirlock);
254     #endif
255     }
256    
257     static void etp_atfork_parent (void)
258     {
259     #if !HAVE_READDIR_R
260     X_UNLOCK (readdirlock);
261     #endif
262     #if !HAVE_PREADWRITE
263     X_UNLOCK (preadwritelock);
264     #endif
265     X_UNLOCK (reslock);
266     X_UNLOCK (reqlock);
267     X_UNLOCK (etplock);
268     }
269    
270     static void etp_atfork_child (void)
271     {
272     eio_req *prv;
273    
274     while (prv = reqq_shift (&req_queue))
275     eio_destroy (prv);
276    
277     while (prv = reqq_shift (&res_queue))
278     eio_destroy (prv);
279    
280     while (wrk_first.next != &wrk_first)
281     {
282     worker *wrk = wrk_first.next;
283    
284     if (wrk->req)
285     eio_destroy (wrk->req);
286    
287     worker_clear (wrk);
288     worker_free (wrk);
289     }
290    
291     started = 0;
292     idle = 0;
293     nreqs = 0;
294     nready = 0;
295     npending = 0;
296    
297     etp_atfork_parent ();
298     }
299    
300     static void
301     etp_once_init (void)
302     {
303     X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
304     }
305    
306     static int
307     etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
308     {
309     static pthread_once_t doinit = PTHREAD_ONCE_INIT;
310    
311     pthread_once (&doinit, etp_once_init);
312    
313     memset (etp, 0, sizeof *etp);
314    
315     etp->want_poll_cb = want_poll;
316     etp->done_poll_cb = done_poll;
317     }
318    
319     static etp_pool etp;
320    
321     /*****************************************************************************/
322    
323 root 1.2 static void grp_try_feed (eio_req *grp)
324 root 1.1 {
325     while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
326     {
327     int old_len = grp->size;
328    
329     EIO_FEED (grp);
330    
331     /* stop if no progress has been made */
332     if (old_len == grp->size)
333     {
334     grp->feed = 0;
335 root 1.2 break;
336 root 1.1 }
337     }
338     }
339    
340 root 1.2 static int eio_finish (eio_req *req);
341 root 1.1
342     static int grp_dec (eio_req *grp)
343     {
344     --grp->size;
345    
346     /* call feeder, if applicable */
347 root 1.2 grp_try_feed (grp);
348 root 1.1
349     /* finish, if done */
350     if (!grp->size && grp->int1)
351 root 1.2 return eio_finish (grp);
352 root 1.1 else
353     return 0;
354     }
355    
356     void eio_destroy (eio_req *req)
357     {
358 root 1.6 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
359     if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
360 root 1.1
361     EIO_DESTROY (req);
362     }
363    
364 root 1.2 static int eio_finish (eio_req *req)
365 root 1.1 {
366     int res = EIO_FINISH (req);
367    
368     if (req->grp)
369     {
370     int res2;
371     eio_req *grp = req->grp;
372    
373     /* unlink request */
374     if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
375     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
376    
377     if (grp->grp_first == req)
378     grp->grp_first = req->grp_next;
379    
380     res2 = grp_dec (grp);
381    
382     if (!res && res2)
383     res = res2;
384     }
385    
386     eio_destroy (req);
387    
388     return res;
389     }
390    
391     void eio_grp_cancel (eio_req *grp)
392     {
393     for (grp = grp->grp_first; grp; grp = grp->grp_next)
394     eio_cancel (grp);
395     }
396    
397     void eio_cancel (eio_req *req)
398     {
399 root 1.13 X_LOCK (etplock);
400 root 1.1 req->flags |= EIO_FLAG_CANCELLED;
401 root 1.13 X_UNLOCK (etplock);
402 root 1.1
403     eio_grp_cancel (req);
404     }
405    
406     X_THREAD_PROC (eio_proc);
407    
408     static void start_thread (void)
409     {
410     worker *wrk = calloc (1, sizeof (worker));
411    
412 root 1.8 /*TODO*/
413 root 1.9 assert (("unable to allocate worker thread data", wrk));
414 root 1.1
415 root 1.13 X_LOCK (etplock);
416 root 1.1
417     if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
418     {
419     wrk->prev = &wrk_first;
420     wrk->next = wrk_first.next;
421     wrk_first.next->prev = wrk;
422     wrk_first.next = wrk;
423     ++started;
424     }
425     else
426     free (wrk);
427    
428 root 1.13 X_UNLOCK (etplock);
429 root 1.1 }
430    
431     static void maybe_start_thread (void)
432     {
433     if (eio_nthreads () >= wanted)
434     return;
435    
436     /* todo: maybe use idle here, but might be less exact */
437     if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
438     return;
439    
440     start_thread ();
441     }
442    
443     void eio_submit (eio_req *req)
444     {
445 root 1.11 req->pri += EIO_PRI_BIAS;
446    
447     if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS;
448     if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
449    
450 root 1.1 ++nreqs;
451    
452     X_LOCK (reqlock);
453     ++nready;
454     reqq_push (&req_queue, req);
455     X_COND_SIGNAL (reqwait);
456     X_UNLOCK (reqlock);
457    
458     maybe_start_thread ();
459     }
460    
461     static void end_thread (void)
462     {
463     eio_req *req = calloc (1, sizeof (eio_req));
464    
465     req->type = EIO_QUIT;
466     req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
467    
468     X_LOCK (reqlock);
469     reqq_push (&req_queue, req);
470     X_COND_SIGNAL (reqwait);
471     X_UNLOCK (reqlock);
472    
473 root 1.13 X_LOCK (etplock);
474 root 1.1 --started;
475 root 1.13 X_UNLOCK (etplock);
476 root 1.1 }
477    
478     void eio_set_max_poll_time (double nseconds)
479     {
480     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
481 root 1.13 etp.max_poll_time = nseconds;
482 root 1.1 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
483     }
484    
485     void eio_set_max_poll_reqs (unsigned int maxreqs)
486     {
487     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
488 root 1.13 etp.max_poll_reqs = maxreqs;
489 root 1.1 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
490     }
491    
492     void eio_set_max_idle (unsigned int nthreads)
493     {
494     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
495     max_idle = nthreads <= 0 ? 1 : nthreads;
496     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
497     }
498    
499     void eio_set_min_parallel (unsigned int nthreads)
500     {
501     if (wanted < nthreads)
502     wanted = nthreads;
503     }
504    
505     void eio_set_max_parallel (unsigned int nthreads)
506     {
507     if (wanted > nthreads)
508     wanted = nthreads;
509    
510     while (started > wanted)
511     end_thread ();
512     }
513    
514     int eio_poll (void)
515     {
516 root 1.13 int maxreqs = etp.max_poll_reqs;
517 root 1.1 struct timeval tv_start, tv_now;
518     eio_req *req;
519    
520 root 1.13 if (etp.max_poll_time)
521 root 1.1 gettimeofday (&tv_start, 0);
522    
523     for (;;)
524     {
525     maybe_start_thread ();
526    
527     X_LOCK (reslock);
528     req = reqq_shift (&res_queue);
529    
530     if (req)
531     {
532     --npending;
533    
534 root 1.13 if (!res_queue.size && etp.done_poll_cb)
535     etp.done_poll_cb ();
536 root 1.1 }
537    
538     X_UNLOCK (reslock);
539    
540     if (!req)
541     return 0;
542    
543     --nreqs;
544    
545     if (req->type == EIO_GROUP && req->size)
546     {
547     req->int1 = 1; /* mark request as delayed */
548     continue;
549     }
550     else
551     {
552 root 1.2 int res = eio_finish (req);
553 root 1.1 if (res)
554     return res;
555     }
556    
557     if (maxreqs && !--maxreqs)
558     break;
559    
560 root 1.13 if (etp.max_poll_time)
561 root 1.1 {
562     gettimeofday (&tv_now, 0);
563    
564 root 1.13 if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
565 root 1.1 break;
566     }
567     }
568    
569     errno = EAGAIN;
570     return -1;
571     }
572    
573     /*****************************************************************************/
574     /* work around various missing functions */
575    
576     #if !HAVE_PREADWRITE
577 root 1.9 # define pread eio__pread
578     # define pwrite eio__pwrite
579 root 1.1
580     /*
581     * make our pread/pwrite safe against themselves, but not against
582     * normal read/write by using a mutex. slows down execution a lot,
583     * but that's your problem, not mine.
584     */
585     static mutex_t preadwritelock = X_MUTEX_INIT;
586    
587 root 1.9 static ssize_t
588     eio__pread (int fd, void *buf, size_t count, off_t offset)
589 root 1.1 {
590     ssize_t res;
591     off_t ooffset;
592    
593     X_LOCK (preadwritelock);
594     ooffset = lseek (fd, 0, SEEK_CUR);
595     lseek (fd, offset, SEEK_SET);
596     res = read (fd, buf, count);
597     lseek (fd, ooffset, SEEK_SET);
598     X_UNLOCK (preadwritelock);
599    
600     return res;
601     }
602    
603 root 1.9 static ssize_t
604     eio__pwrite (int fd, void *buf, size_t count, off_t offset)
605 root 1.1 {
606     ssize_t res;
607     off_t ooffset;
608    
609     X_LOCK (preadwritelock);
610     ooffset = lseek (fd, 0, SEEK_CUR);
611     lseek (fd, offset, SEEK_SET);
612     res = write (fd, buf, count);
613     lseek (fd, offset, SEEK_SET);
614     X_UNLOCK (preadwritelock);
615    
616     return res;
617     }
618     #endif
619    
620     #ifndef HAVE_FUTIMES
621    
622 root 1.9 # define utimes(path,times) eio__utimes (path, times)
623     # define futimes(fd,times) eio__futimes (fd, times)
624 root 1.1
625 root 1.9 static int
626     eio__utimes (const char *filename, const struct timeval times[2])
627 root 1.1 {
628     if (times)
629     {
630     struct utimbuf buf;
631    
632     buf.actime = times[0].tv_sec;
633     buf.modtime = times[1].tv_sec;
634    
635     return utime (filename, &buf);
636     }
637     else
638     return utime (filename, 0);
639     }
640    
641 root 1.9 static int eio__futimes (int fd, const struct timeval tv[2])
642 root 1.1 {
643     errno = ENOSYS;
644     return -1;
645     }
646    
647     #endif
648    
649     #if !HAVE_FDATASYNC
650     # define fdatasync fsync
651     #endif
652    
653     #if !HAVE_READAHEAD
654 root 1.9 # define readahead(fd,offset,count) eio__readahead (fd, offset, count, self)
655 root 1.1
656 root 1.9 static ssize_t
657     eio__readahead (int fd, off_t offset, size_t count, worker *self)
658 root 1.1 {
659     size_t todo = count;
660     dBUF;
661    
662     while (todo > 0)
663     {
664     size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
665    
666 root 1.3 pread (fd, eio_buf, len, offset);
667 root 1.1 offset += len;
668     todo -= len;
669     }
670    
671     errno = 0;
672     return count;
673     }
674    
675     #endif
676    
677     #if !HAVE_READDIR_R
678 root 1.9 # define readdir_r eio__readdir_r
679 root 1.1
680     static mutex_t readdirlock = X_MUTEX_INIT;
681    
682 root 1.9 static int
683     eio__readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
684 root 1.1 {
685 root 1.5 EIO_STRUCT_DIRENT *e;
686 root 1.1 int errorno;
687    
688     X_LOCK (readdirlock);
689    
690     e = readdir (dirp);
691     errorno = errno;
692    
693     if (e)
694     {
695     *res = ent;
696     strcpy (ent->d_name, e->d_name);
697     }
698     else
699     *res = 0;
700    
701     X_UNLOCK (readdirlock);
702    
703     errno = errorno;
704     return e ? 0 : -1;
705     }
706     #endif
707    
708     /* sendfile always needs emulation */
709 root 1.9 static ssize_t
710     eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
711 root 1.1 {
712     ssize_t res;
713    
714     if (!count)
715     return 0;
716    
717     #if HAVE_SENDFILE
718     # if __linux
719     res = sendfile (ofd, ifd, &offset, count);
720    
721     # elif __freebsd
722     /*
723     * Of course, the freebsd sendfile is a dire hack with no thoughts
724     * wasted on making it similar to other I/O functions.
725     */
726     {
727     off_t sbytes;
728     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
729    
730     if (res < 0 && sbytes)
731     /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
732     res = sbytes;
733     }
734    
735     # elif __hpux
736     res = sendfile (ofd, ifd, offset, count, 0, 0);
737    
738     # elif __solaris
739     {
740     struct sendfilevec vec;
741     size_t sbytes;
742    
743     vec.sfv_fd = ifd;
744     vec.sfv_flag = 0;
745     vec.sfv_off = offset;
746     vec.sfv_len = count;
747    
748     res = sendfilev (ofd, &vec, 1, &sbytes);
749    
750     if (res < 0 && sbytes)
751     res = sbytes;
752     }
753    
754     # endif
755     #else
756     res = -1;
757     errno = ENOSYS;
758     #endif
759    
760     if (res < 0
761     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
762     #if __solaris
763     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
764     #endif
765     )
766     )
767     {
768     /* emulate sendfile. this is a major pain in the ass */
769     dBUF;
770    
771     res = 0;
772    
773     while (count)
774     {
775     ssize_t cnt;
776    
777     cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
778    
779     if (cnt <= 0)
780     {
781     if (cnt && !res) res = -1;
782     break;
783     }
784    
785     cnt = write (ofd, eio_buf, cnt);
786    
787     if (cnt <= 0)
788     {
789     if (cnt && !res) res = -1;
790     break;
791     }
792    
793     offset += cnt;
794     res += cnt;
795     count -= cnt;
796     }
797     }
798    
799     return res;
800     }
801    
802     /* read a full directory */
803 root 1.9 static void
804     eio__scandir (eio_req *req, worker *self)
805 root 1.1 {
806     DIR *dirp;
807     union
808     {
809     EIO_STRUCT_DIRENT d;
810     char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
811     } *u;
812     EIO_STRUCT_DIRENT *entp;
813     char *name, *names;
814     int memlen = 4096;
815     int memofs = 0;
816     int res = 0;
817    
818 root 1.13 X_LOCK (etplock);
819 root 1.1 self->dirp = dirp = opendir (req->ptr1);
820     self->dbuf = u = malloc (sizeof (*u));
821     req->flags |= EIO_FLAG_PTR2_FREE;
822     req->ptr2 = names = malloc (memlen);
823 root 1.13 X_UNLOCK (etplock);
824 root 1.1
825     if (dirp && u && names)
826     for (;;)
827     {
828     errno = 0;
829     readdir_r (dirp, &u->d, &entp);
830    
831     if (!entp)
832     break;
833    
834     name = entp->d_name;
835    
836     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
837     {
838     int len = strlen (name) + 1;
839    
840     res++;
841    
842     while (memofs + len > memlen)
843     {
844     memlen *= 2;
845 root 1.13 X_LOCK (etplock);
846 root 1.1 req->ptr2 = names = realloc (names, memlen);
847 root 1.13 X_UNLOCK (etplock);
848 root 1.1
849     if (!names)
850     break;
851     }
852    
853     memcpy (names + memofs, name, len);
854     memofs += len;
855     }
856     }
857    
858     if (errno)
859     res = -1;
860    
861     req->result = res;
862     }
863    
864     /*****************************************************************************/
865    
866 root 1.6 #define ALLOC(len) \
867     if (!req->ptr2) \
868     { \
869 root 1.13 X_LOCK (etplock); \
870 root 1.7 req->flags |= EIO_FLAG_PTR2_FREE; \
871 root 1.13 X_UNLOCK (etplock); \
872 root 1.7 req->ptr2 = malloc (len); \
873     if (!req->ptr2) \
874     { \
875     errno = ENOMEM; \
876     req->result = -1; \
877     break; \
878     } \
879 root 1.6 }
880    
881 root 1.1 X_THREAD_PROC (eio_proc)
882     {
883     eio_req *req;
884     struct timespec ts;
885     worker *self = (worker *)thr_arg;
886    
887     /* try to distribute timeouts somewhat randomly */
888     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
889    
890     for (;;)
891     {
892     ts.tv_sec = time (0) + IDLE_TIMEOUT;
893    
894     X_LOCK (reqlock);
895    
896     for (;;)
897     {
898     self->req = req = reqq_shift (&req_queue);
899    
900     if (req)
901     break;
902    
903     ++idle;
904    
905     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
906     {
907     if (idle > max_idle)
908     {
909     --idle;
910     X_UNLOCK (reqlock);
911 root 1.13 X_LOCK (etplock);
912 root 1.1 --started;
913 root 1.13 X_UNLOCK (etplock);
914 root 1.1 goto quit;
915     }
916    
917     /* we are allowed to idle, so do so without any timeout */
918     X_COND_WAIT (reqwait, reqlock);
919     ts.tv_sec = time (0) + IDLE_TIMEOUT;
920     }
921    
922     --idle;
923     }
924    
925     --nready;
926    
927     X_UNLOCK (reqlock);
928    
929     errno = 0; /* strictly unnecessary */
930    
931     if (!EIO_CANCELLED (req))
932     switch (req->type)
933     {
934 root 1.7 case EIO_READ: ALLOC (req->size);
935     req->result = req->offs >= 0
936 root 1.1 ? pread (req->int1, req->ptr2, req->size, req->offs)
937     : read (req->int1, req->ptr2, req->size); break;
938     case EIO_WRITE: req->result = req->offs >= 0
939     ? pwrite (req->int1, req->ptr2, req->size, req->offs)
940     : write (req->int1, req->ptr2, req->size); break;
941    
942 root 1.9 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
943     case EIO_SENDFILE: req->result = eio__sendfile (req->int1, req->int2, req->offs, req->size, self); break;
944 root 1.1
945 root 1.6 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
946     req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
947     case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
948     req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
949     case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
950     req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
951 root 1.1
952     case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
953     case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
954     case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
955     case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
956     case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
957     case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
958    
959     case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
960     case EIO_CLOSE: req->result = close (req->int1); break;
961     case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
962     case EIO_UNLINK: req->result = unlink (req->ptr1); break;
963     case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
964     case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
965     case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
966     case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
967     case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
968     case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
969 root 1.7
970 root 1.6 case EIO_READLINK: ALLOC (NAME_MAX);
971     req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
972 root 1.1
973     case EIO_SYNC: req->result = 0; sync (); break;
974     case EIO_FSYNC: req->result = fsync (req->int1); break;
975     case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
976    
977 root 1.9 case EIO_READDIR: eio__scandir (req, self); break;
978 root 1.1
979     case EIO_BUSY:
980     #ifdef _WIN32
981     Sleep (req->nv1 * 1000.);
982     #else
983     {
984     struct timeval tv;
985    
986     tv.tv_sec = req->nv1;
987     tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
988    
989     req->result = select (0, 0, 0, 0, &tv);
990     }
991     #endif
992     break;
993    
994     case EIO_UTIME:
995     case EIO_FUTIME:
996     {
997     struct timeval tv[2];
998     struct timeval *times;
999    
1000     if (req->nv1 != -1. || req->nv2 != -1.)
1001     {
1002     tv[0].tv_sec = req->nv1;
1003     tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
1004     tv[1].tv_sec = req->nv2;
1005     tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
1006    
1007     times = tv;
1008     }
1009     else
1010     times = 0;
1011    
1012    
1013     req->result = req->type == EIO_FUTIME
1014     ? futimes (req->int1, times)
1015     : utimes (req->ptr1, times);
1016     }
1017    
1018     case EIO_GROUP:
1019     case EIO_NOP:
1020 root 1.9 req->result = 0;
1021 root 1.1 break;
1022    
1023     case EIO_QUIT:
1024     goto quit;
1025    
1026     default:
1027     req->result = -1;
1028     break;
1029     }
1030    
1031     req->errorno = errno;
1032    
1033     X_LOCK (reslock);
1034    
1035     ++npending;
1036    
1037 root 1.13 if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
1038     etp.want_poll_cb ();
1039 root 1.1
1040     self->req = 0;
1041     worker_clear (self);
1042    
1043     X_UNLOCK (reslock);
1044     }
1045    
1046     quit:
1047 root 1.13 X_LOCK (etplock);
1048 root 1.1 worker_free (self);
1049 root 1.13 X_UNLOCK (etplock);
1050 root 1.1
1051     return 0;
1052     }
1053    
1054     /*****************************************************************************/
1055    
1056     int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1057     {
1058 root 1.13 etp_init (&etp, want_poll, done_poll);
1059 root 1.1 }
1060    
1061 root 1.6 static void eio_api_destroy (eio_req *req)
1062     {
1063     free (req);
1064     }
1065    
1066     #define REQ(rtype) \
1067     eio_req *req; \
1068     \
1069     req = (eio_req *)calloc (1, sizeof *req); \
1070     if (!req) \
1071     return 0; \
1072     \
1073 root 1.11 req->type = rtype; \
1074     req->pri = pri; \
1075     req->finish = cb; \
1076     req->data = data; \
1077 root 1.6 req->destroy = eio_api_destroy;
1078    
1079     #define SEND eio_submit (req); return req
1080    
1081 root 1.9 #define PATH \
1082     req->flags |= EIO_FLAG_PTR1_FREE; \
1083     req->ptr1 = strdup (path); \
1084     if (!req->ptr1) \
1085     { \
1086     eio_api_destroy (req); \
1087     return 0; \
1088     }
1089    
1090 root 1.12 #ifndef EIO_NO_WRAPPERS
1091    
1092 root 1.10 eio_req *eio_nop (int pri, eio_cb cb, void *data)
1093 root 1.9 {
1094     REQ (EIO_NOP); SEND;
1095     }
1096    
1097 root 1.10 eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data)
1098 root 1.9 {
1099     REQ (EIO_BUSY); req->nv1 = delay; SEND;
1100     }
1101    
1102 root 1.10 eio_req *eio_sync (int pri, eio_cb cb, void *data)
1103 root 1.9 {
1104     REQ (EIO_SYNC); SEND;
1105     }
1106 root 1.6
1107 root 1.10 eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data)
1108 root 1.6 {
1109 root 1.9 REQ (EIO_FSYNC); req->int1 = fd; SEND;
1110 root 1.6 }
1111    
1112 root 1.10 eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data)
1113 root 1.6 {
1114 root 1.9 REQ (EIO_FDATASYNC); req->int1 = fd; SEND;
1115     }
1116    
1117 root 1.10 eio_req *eio_close (int fd, int pri, eio_cb cb, void *data)
1118 root 1.9 {
1119     REQ (EIO_CLOSE); req->int1 = fd; SEND;
1120 root 1.6 }
1121    
1122 root 1.10 eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data)
1123 root 1.6 {
1124 root 1.9 REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1125 root 1.6 }
1126    
1127 root 1.10 eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1128 root 1.6 {
1129 root 1.10 REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1130 root 1.6 }
1131    
1132 root 1.10 eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1133 root 1.6 {
1134 root 1.10 REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1135 root 1.6 }
1136    
1137 root 1.10 eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data)
1138 root 1.6 {
1139 root 1.9 REQ (EIO_FSTAT); req->int1 = fd; SEND;
1140 root 1.6 }
1141    
1142 root 1.10 eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data)
1143 root 1.6 {
1144 root 1.9 REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1145 root 1.6 }
1146    
1147 root 1.10 eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data)
1148 root 1.6 {
1149 root 1.9 REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1150 root 1.6 }
1151    
1152 root 1.10 eio_req *eio_fchmod (int fd, mode_t mode, int pri, eio_cb cb, void *data)
1153 root 1.6 {
1154 root 1.9 REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1155 root 1.6 }
1156    
1157 root 1.10 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1158 root 1.6 {
1159 root 1.9 REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1160 root 1.6 }
1161    
1162 root 1.10 eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data)
1163 root 1.6 {
1164 root 1.9 REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1165 root 1.6 }
1166    
1167 root 1.10 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data)
1168 root 1.6 {
1169 root 1.9 REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1170 root 1.6 }
1171    
1172 root 1.10 eio_req *eio_open (const char *path, int flags, mode_t mode, int pri, eio_cb cb, void *data)
1173 root 1.6 {
1174 root 1.9 REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1175 root 1.6 }
1176    
1177 root 1.10 eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data)
1178 root 1.6 {
1179 root 1.9 REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1180 root 1.6 }
1181    
1182 root 1.10 eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data)
1183 root 1.6 {
1184 root 1.9 REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND;
1185 root 1.6 }
1186    
1187 root 1.10 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1188 root 1.6 {
1189 root 1.9 REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1190 root 1.6 }
1191    
1192 root 1.10 eio_req *eio_chmod (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1193 root 1.6 {
1194 root 1.9 REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND;
1195 root 1.6 }
1196    
1197 root 1.10 eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1198 root 1.6 {
1199 root 1.9 REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
1200 root 1.6 }
1201    
1202 root 1.9 static eio_req *
1203 root 1.10 eio__1path (int type, const char *path, int pri, eio_cb cb, void *data)
1204 root 1.6 {
1205 root 1.9 REQ (type); PATH; SEND;
1206 root 1.6 }
1207    
1208 root 1.10 eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data)
1209 root 1.6 {
1210 root 1.10 return eio__1path (EIO_READLINK, path, pri, cb, data);
1211 root 1.6 }
1212    
1213 root 1.10 eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data)
1214 root 1.6 {
1215 root 1.10 return eio__1path (EIO_STAT, path, pri, cb, data);
1216 root 1.6 }
1217    
1218 root 1.10 eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data)
1219 root 1.6 {
1220 root 1.10 return eio__1path (EIO_LSTAT, path, pri, cb, data);
1221 root 1.6 }
1222    
1223 root 1.10 eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data)
1224 root 1.6 {
1225 root 1.10 return eio__1path (EIO_UNLINK, path, pri, cb, data);
1226 root 1.6 }
1227    
1228 root 1.10 eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data)
1229 root 1.6 {
1230 root 1.10 return eio__1path (EIO_RMDIR, path, pri, cb, data);
1231 root 1.6 }
1232    
1233 root 1.10 eio_req *eio_readdir (const char *path, int pri, eio_cb cb, void *data)
1234 root 1.1 {
1235 root 1.10 return eio__1path (EIO_READDIR, path, pri, cb, data);
1236 root 1.1 }
1237    
1238 root 1.10 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, int pri, eio_cb cb, void *data)
1239 root 1.1 {
1240 root 1.9 REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1241 root 1.1 }
1242    
1243 root 1.9 static eio_req *
1244 root 1.10 eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1245 root 1.1 {
1246 root 1.9 REQ (type); PATH;
1247 root 1.1
1248 root 1.9 req->flags |= EIO_FLAG_PTR2_FREE;
1249     req->ptr2 = strdup (new_path);
1250     if (!req->ptr2)
1251     {
1252     eio_api_destroy (req);
1253     return 0;
1254     }
1255 root 1.1
1256 root 1.9 SEND;
1257 root 1.1 }
1258    
1259 root 1.10 eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1260 root 1.1 {
1261 root 1.10 return eio__2path (EIO_LINK, path, new_path, pri, cb, data);
1262 root 1.1 }
1263    
1264 root 1.10 eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1265 root 1.1 {
1266 root 1.10 return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data);
1267 root 1.1 }
1268    
1269 root 1.10 eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1270 root 1.1 {
1271 root 1.10 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1272     }
1273    
1274 root 1.12 #endif
1275    
1276 root 1.10 eio_req *eio_grp (eio_cb cb, void *data)
1277     {
1278     const int pri = EIO_PRI_MAX;
1279    
1280     REQ (EIO_GROUP); SEND;
1281 root 1.1 }
1282    
1283 root 1.9 #undef REQ
1284     #undef PATH
1285     #undef SEND
1286 root 1.1
1287 root 1.9 /*****************************************************************************/
1288     /* grp functions */
1289 root 1.1
1290 root 1.2 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1291 root 1.1 {
1292     grp->int2 = limit;
1293     grp->feed = feed;
1294 root 1.2
1295     grp_try_feed (grp);
1296     }
1297    
1298     void eio_grp_limit (eio_req *grp, int limit)
1299     {
1300     grp->int2 = limit;
1301    
1302     grp_try_feed (grp);
1303 root 1.1 }
1304    
1305     void eio_grp_add (eio_req *grp, eio_req *req)
1306     {
1307     assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1308    
1309     ++grp->size;
1310     req->grp = grp;
1311    
1312     req->grp_prev = 0;
1313     req->grp_next = grp->grp_first;
1314    
1315     if (grp->grp_first)
1316     grp->grp_first->grp_prev = req;
1317    
1318     grp->grp_first = req;
1319     }
1320    
1321 root 1.9 /*****************************************************************************/
1322     /* misc garbage */
1323    
1324     ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
1325     {
1326     worker wrk;
1327    
1328     wrk.dbuf = 0;
1329    
1330     eio__sendfile (ofd, ifd, offset, count, &wrk);
1331    
1332     if (wrk.dbuf)
1333     free (wrk.dbuf);
1334     }
1335 root 1.1