ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.13
Committed: Tue May 13 19:34:11 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.12: +130 -116 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "eio.h"
2 #include "xthread.h"
3
4 #include <errno.h>
5 #include <stddef.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <limits.h>
12 #include <fcntl.h>
13 #include <assert.h>
14
15 #ifndef EIO_FINISH
16 # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17 #endif
18
19 #ifndef EIO_DESTROY
20 # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21 #endif
22
23 #ifndef EIO_FEED
24 # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25 #endif
26
27 #ifdef _WIN32
28
29 /*doh*/
30
31 #else
32
33 # include "config.h"
34 # include <sys/time.h>
35 # include <sys/select.h>
36 # include <unistd.h>
37 # include <utime.h>
38 # include <signal.h>
39 # include <dirent.h>
40
41 # ifndef EIO_STRUCT_DIRENT
42 # define EIO_STRUCT_DIRENT struct dirent
43 # endif
44
45 #endif
46
47 #if HAVE_SENDFILE
48 # if __linux
49 # include <sys/sendfile.h>
50 # elif __freebsd
51 # include <sys/socket.h>
52 # include <sys/uio.h>
53 # elif __hpux
54 # include <sys/socket.h>
55 # elif __solaris /* not yet */
56 # include <sys/sendfile.h>
57 # else
58 # error sendfile support requested but not available
59 # endif
60 #endif
61
62 /* number of seconds after which an idle threads exit */
63 #define IDLE_TIMEOUT 10
64
65 /* used for struct dirent, AIX doesn't provide it */
66 #ifndef NAME_MAX
67 # define NAME_MAX 4096
68 #endif
69
70 /* buffer size for various temporary buffers */
71 #define EIO_BUFSIZE 65536
72
73 #define dBUF \
74 char *eio_buf; \
75 X_LOCK (etplock); \
76 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77 X_UNLOCK (etplock); \
78 errno = ENOMEM; \
79 if (!eio_buf) \
80 return -1;
81
82 #define EIO_TICKS ((1000000 + 1023) >> 10)
83
84 /*****************************************************************************/
85
86 /* calculcate time difference in ~1/EIO_TICKS of a second */
87 static int tvdiff (struct timeval *tv1, struct timeval *tv2)
88 {
89 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
90 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
91 }
92
93 static unsigned int started, idle, wanted = 4;
94
95 typedef struct etp_pool
96 {
97 void (*want_poll_cb) (void);
98 void (*done_poll_cb) (void);
99
100 unsigned int max_poll_time;
101 unsigned int max_poll_reqs;
102 } etp_pool;
103
104 static volatile unsigned int nreqs, nready, npending;
105 static volatile unsigned int max_idle = 4;
106
107 static mutex_t etplock = X_MUTEX_INIT;
108 static mutex_t reslock = X_MUTEX_INIT;
109 static mutex_t reqlock = X_MUTEX_INIT;
110 static cond_t reqwait = X_COND_INIT;
111
112 typedef struct worker
113 {
114 /* locked by etplock */
115 struct worker *prev, *next;
116
117 thread_t tid;
118
119 /* locked by reslock, reqlock or etplock */
120 eio_req *req; /* currently processed request */
121 void *dbuf;
122 DIR *dirp;
123 } worker;
124
125 static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
126
127 /* worker threads management */
128
129 static void worker_clear (worker *wrk)
130 {
131 if (wrk->dirp)
132 {
133 closedir (wrk->dirp);
134 wrk->dirp = 0;
135 }
136
137 if (wrk->dbuf)
138 {
139 free (wrk->dbuf);
140 wrk->dbuf = 0;
141 }
142 }
143
144 static void worker_free (worker *wrk)
145 {
146 wrk->next->prev = wrk->prev;
147 wrk->prev->next = wrk->next;
148
149 free (wrk);
150 }
151
152 unsigned int eio_nreqs (void)
153 {
154 return nreqs;
155 }
156
157 unsigned int eio_nready (void)
158 {
159 unsigned int retval;
160
161 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
162 retval = nready;
163 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
164
165 return retval;
166 }
167
168 unsigned int eio_npending (void)
169 {
170 unsigned int retval;
171
172 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
173 retval = npending;
174 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
175
176 return retval;
177 }
178
179 unsigned int eio_nthreads (void)
180 {
181 unsigned int retval;
182
183 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
184 retval = started;
185 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
186
187 return retval;
188 }
189
190 /*
191 * a somewhat faster data structure might be nice, but
192 * with 8 priorities this actually needs <20 insns
193 * per shift, the most expensive operation.
194 */
195 typedef struct {
196 eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
197 int size;
198 } reqq;
199
200 static reqq req_queue;
201 static reqq res_queue;
202
203 static int reqq_push (reqq *q, eio_req *req)
204 {
205 int pri = req->pri;
206 req->next = 0;
207
208 if (q->qe[pri])
209 {
210 q->qe[pri]->next = req;
211 q->qe[pri] = req;
212 }
213 else
214 q->qe[pri] = q->qs[pri] = req;
215
216 return q->size++;
217 }
218
219 static eio_req *reqq_shift (reqq *q)
220 {
221 int pri;
222
223 if (!q->size)
224 return 0;
225
226 --q->size;
227
228 for (pri = EIO_NUM_PRI; pri--; )
229 {
230 eio_req *req = q->qs[pri];
231
232 if (req)
233 {
234 if (!(q->qs[pri] = (eio_req *)req->next))
235 q->qe[pri] = 0;
236
237 return req;
238 }
239 }
240
241 abort ();
242 }
243
244 static void etp_atfork_prepare (void)
245 {
246 X_LOCK (etplock);
247 X_LOCK (reqlock);
248 X_LOCK (reslock);
249 #if !HAVE_PREADWRITE
250 X_LOCK (preadwritelock);
251 #endif
252 #if !HAVE_READDIR_R
253 X_LOCK (readdirlock);
254 #endif
255 }
256
257 static void etp_atfork_parent (void)
258 {
259 #if !HAVE_READDIR_R
260 X_UNLOCK (readdirlock);
261 #endif
262 #if !HAVE_PREADWRITE
263 X_UNLOCK (preadwritelock);
264 #endif
265 X_UNLOCK (reslock);
266 X_UNLOCK (reqlock);
267 X_UNLOCK (etplock);
268 }
269
270 static void etp_atfork_child (void)
271 {
272 eio_req *prv;
273
274 while (prv = reqq_shift (&req_queue))
275 eio_destroy (prv);
276
277 while (prv = reqq_shift (&res_queue))
278 eio_destroy (prv);
279
280 while (wrk_first.next != &wrk_first)
281 {
282 worker *wrk = wrk_first.next;
283
284 if (wrk->req)
285 eio_destroy (wrk->req);
286
287 worker_clear (wrk);
288 worker_free (wrk);
289 }
290
291 started = 0;
292 idle = 0;
293 nreqs = 0;
294 nready = 0;
295 npending = 0;
296
297 etp_atfork_parent ();
298 }
299
300 static void
301 etp_once_init (void)
302 {
303 X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
304 }
305
306 static int
307 etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
308 {
309 static pthread_once_t doinit = PTHREAD_ONCE_INIT;
310
311 pthread_once (&doinit, etp_once_init);
312
313 memset (etp, 0, sizeof *etp);
314
315 etp->want_poll_cb = want_poll;
316 etp->done_poll_cb = done_poll;
317 }
318
319 static etp_pool etp;
320
321 /*****************************************************************************/
322
323 static void grp_try_feed (eio_req *grp)
324 {
325 while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
326 {
327 int old_len = grp->size;
328
329 EIO_FEED (grp);
330
331 /* stop if no progress has been made */
332 if (old_len == grp->size)
333 {
334 grp->feed = 0;
335 break;
336 }
337 }
338 }
339
340 static int eio_finish (eio_req *req);
341
342 static int grp_dec (eio_req *grp)
343 {
344 --grp->size;
345
346 /* call feeder, if applicable */
347 grp_try_feed (grp);
348
349 /* finish, if done */
350 if (!grp->size && grp->int1)
351 return eio_finish (grp);
352 else
353 return 0;
354 }
355
356 void eio_destroy (eio_req *req)
357 {
358 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
359 if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
360
361 EIO_DESTROY (req);
362 }
363
364 static int eio_finish (eio_req *req)
365 {
366 int res = EIO_FINISH (req);
367
368 if (req->grp)
369 {
370 int res2;
371 eio_req *grp = req->grp;
372
373 /* unlink request */
374 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
375 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
376
377 if (grp->grp_first == req)
378 grp->grp_first = req->grp_next;
379
380 res2 = grp_dec (grp);
381
382 if (!res && res2)
383 res = res2;
384 }
385
386 eio_destroy (req);
387
388 return res;
389 }
390
391 void eio_grp_cancel (eio_req *grp)
392 {
393 for (grp = grp->grp_first; grp; grp = grp->grp_next)
394 eio_cancel (grp);
395 }
396
397 void eio_cancel (eio_req *req)
398 {
399 X_LOCK (etplock);
400 req->flags |= EIO_FLAG_CANCELLED;
401 X_UNLOCK (etplock);
402
403 eio_grp_cancel (req);
404 }
405
406 X_THREAD_PROC (eio_proc);
407
408 static void start_thread (void)
409 {
410 worker *wrk = calloc (1, sizeof (worker));
411
412 /*TODO*/
413 assert (("unable to allocate worker thread data", wrk));
414
415 X_LOCK (etplock);
416
417 if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
418 {
419 wrk->prev = &wrk_first;
420 wrk->next = wrk_first.next;
421 wrk_first.next->prev = wrk;
422 wrk_first.next = wrk;
423 ++started;
424 }
425 else
426 free (wrk);
427
428 X_UNLOCK (etplock);
429 }
430
431 static void maybe_start_thread (void)
432 {
433 if (eio_nthreads () >= wanted)
434 return;
435
436 /* todo: maybe use idle here, but might be less exact */
437 if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
438 return;
439
440 start_thread ();
441 }
442
443 void eio_submit (eio_req *req)
444 {
445 req->pri += EIO_PRI_BIAS;
446
447 if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS;
448 if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
449
450 ++nreqs;
451
452 X_LOCK (reqlock);
453 ++nready;
454 reqq_push (&req_queue, req);
455 X_COND_SIGNAL (reqwait);
456 X_UNLOCK (reqlock);
457
458 maybe_start_thread ();
459 }
460
461 static void end_thread (void)
462 {
463 eio_req *req = calloc (1, sizeof (eio_req));
464
465 req->type = EIO_QUIT;
466 req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
467
468 X_LOCK (reqlock);
469 reqq_push (&req_queue, req);
470 X_COND_SIGNAL (reqwait);
471 X_UNLOCK (reqlock);
472
473 X_LOCK (etplock);
474 --started;
475 X_UNLOCK (etplock);
476 }
477
478 void eio_set_max_poll_time (double nseconds)
479 {
480 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
481 etp.max_poll_time = nseconds;
482 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
483 }
484
485 void eio_set_max_poll_reqs (unsigned int maxreqs)
486 {
487 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
488 etp.max_poll_reqs = maxreqs;
489 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
490 }
491
492 void eio_set_max_idle (unsigned int nthreads)
493 {
494 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
495 max_idle = nthreads <= 0 ? 1 : nthreads;
496 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
497 }
498
499 void eio_set_min_parallel (unsigned int nthreads)
500 {
501 if (wanted < nthreads)
502 wanted = nthreads;
503 }
504
505 void eio_set_max_parallel (unsigned int nthreads)
506 {
507 if (wanted > nthreads)
508 wanted = nthreads;
509
510 while (started > wanted)
511 end_thread ();
512 }
513
514 int eio_poll (void)
515 {
516 int maxreqs = etp.max_poll_reqs;
517 struct timeval tv_start, tv_now;
518 eio_req *req;
519
520 if (etp.max_poll_time)
521 gettimeofday (&tv_start, 0);
522
523 for (;;)
524 {
525 maybe_start_thread ();
526
527 X_LOCK (reslock);
528 req = reqq_shift (&res_queue);
529
530 if (req)
531 {
532 --npending;
533
534 if (!res_queue.size && etp.done_poll_cb)
535 etp.done_poll_cb ();
536 }
537
538 X_UNLOCK (reslock);
539
540 if (!req)
541 return 0;
542
543 --nreqs;
544
545 if (req->type == EIO_GROUP && req->size)
546 {
547 req->int1 = 1; /* mark request as delayed */
548 continue;
549 }
550 else
551 {
552 int res = eio_finish (req);
553 if (res)
554 return res;
555 }
556
557 if (maxreqs && !--maxreqs)
558 break;
559
560 if (etp.max_poll_time)
561 {
562 gettimeofday (&tv_now, 0);
563
564 if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
565 break;
566 }
567 }
568
569 errno = EAGAIN;
570 return -1;
571 }
572
573 /*****************************************************************************/
574 /* work around various missing functions */
575
576 #if !HAVE_PREADWRITE
577 # define pread eio__pread
578 # define pwrite eio__pwrite
579
580 /*
581 * make our pread/pwrite safe against themselves, but not against
582 * normal read/write by using a mutex. slows down execution a lot,
583 * but that's your problem, not mine.
584 */
585 static mutex_t preadwritelock = X_MUTEX_INIT;
586
587 static ssize_t
588 eio__pread (int fd, void *buf, size_t count, off_t offset)
589 {
590 ssize_t res;
591 off_t ooffset;
592
593 X_LOCK (preadwritelock);
594 ooffset = lseek (fd, 0, SEEK_CUR);
595 lseek (fd, offset, SEEK_SET);
596 res = read (fd, buf, count);
597 lseek (fd, ooffset, SEEK_SET);
598 X_UNLOCK (preadwritelock);
599
600 return res;
601 }
602
603 static ssize_t
604 eio__pwrite (int fd, void *buf, size_t count, off_t offset)
605 {
606 ssize_t res;
607 off_t ooffset;
608
609 X_LOCK (preadwritelock);
610 ooffset = lseek (fd, 0, SEEK_CUR);
611 lseek (fd, offset, SEEK_SET);
612 res = write (fd, buf, count);
613 lseek (fd, offset, SEEK_SET);
614 X_UNLOCK (preadwritelock);
615
616 return res;
617 }
618 #endif
619
620 #ifndef HAVE_FUTIMES
621
622 # define utimes(path,times) eio__utimes (path, times)
623 # define futimes(fd,times) eio__futimes (fd, times)
624
625 static int
626 eio__utimes (const char *filename, const struct timeval times[2])
627 {
628 if (times)
629 {
630 struct utimbuf buf;
631
632 buf.actime = times[0].tv_sec;
633 buf.modtime = times[1].tv_sec;
634
635 return utime (filename, &buf);
636 }
637 else
638 return utime (filename, 0);
639 }
640
641 static int eio__futimes (int fd, const struct timeval tv[2])
642 {
643 errno = ENOSYS;
644 return -1;
645 }
646
647 #endif
648
649 #if !HAVE_FDATASYNC
650 # define fdatasync fsync
651 #endif
652
653 #if !HAVE_READAHEAD
654 # define readahead(fd,offset,count) eio__readahead (fd, offset, count, self)
655
656 static ssize_t
657 eio__readahead (int fd, off_t offset, size_t count, worker *self)
658 {
659 size_t todo = count;
660 dBUF;
661
662 while (todo > 0)
663 {
664 size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
665
666 pread (fd, eio_buf, len, offset);
667 offset += len;
668 todo -= len;
669 }
670
671 errno = 0;
672 return count;
673 }
674
675 #endif
676
677 #if !HAVE_READDIR_R
678 # define readdir_r eio__readdir_r
679
680 static mutex_t readdirlock = X_MUTEX_INIT;
681
682 static int
683 eio__readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
684 {
685 EIO_STRUCT_DIRENT *e;
686 int errorno;
687
688 X_LOCK (readdirlock);
689
690 e = readdir (dirp);
691 errorno = errno;
692
693 if (e)
694 {
695 *res = ent;
696 strcpy (ent->d_name, e->d_name);
697 }
698 else
699 *res = 0;
700
701 X_UNLOCK (readdirlock);
702
703 errno = errorno;
704 return e ? 0 : -1;
705 }
706 #endif
707
708 /* sendfile always needs emulation */
709 static ssize_t
710 eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
711 {
712 ssize_t res;
713
714 if (!count)
715 return 0;
716
717 #if HAVE_SENDFILE
718 # if __linux
719 res = sendfile (ofd, ifd, &offset, count);
720
721 # elif __freebsd
722 /*
723 * Of course, the freebsd sendfile is a dire hack with no thoughts
724 * wasted on making it similar to other I/O functions.
725 */
726 {
727 off_t sbytes;
728 res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
729
730 if (res < 0 && sbytes)
731 /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
732 res = sbytes;
733 }
734
735 # elif __hpux
736 res = sendfile (ofd, ifd, offset, count, 0, 0);
737
738 # elif __solaris
739 {
740 struct sendfilevec vec;
741 size_t sbytes;
742
743 vec.sfv_fd = ifd;
744 vec.sfv_flag = 0;
745 vec.sfv_off = offset;
746 vec.sfv_len = count;
747
748 res = sendfilev (ofd, &vec, 1, &sbytes);
749
750 if (res < 0 && sbytes)
751 res = sbytes;
752 }
753
754 # endif
755 #else
756 res = -1;
757 errno = ENOSYS;
758 #endif
759
760 if (res < 0
761 && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
762 #if __solaris
763 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
764 #endif
765 )
766 )
767 {
768 /* emulate sendfile. this is a major pain in the ass */
769 dBUF;
770
771 res = 0;
772
773 while (count)
774 {
775 ssize_t cnt;
776
777 cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
778
779 if (cnt <= 0)
780 {
781 if (cnt && !res) res = -1;
782 break;
783 }
784
785 cnt = write (ofd, eio_buf, cnt);
786
787 if (cnt <= 0)
788 {
789 if (cnt && !res) res = -1;
790 break;
791 }
792
793 offset += cnt;
794 res += cnt;
795 count -= cnt;
796 }
797 }
798
799 return res;
800 }
801
802 /* read a full directory */
803 static void
804 eio__scandir (eio_req *req, worker *self)
805 {
806 DIR *dirp;
807 union
808 {
809 EIO_STRUCT_DIRENT d;
810 char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
811 } *u;
812 EIO_STRUCT_DIRENT *entp;
813 char *name, *names;
814 int memlen = 4096;
815 int memofs = 0;
816 int res = 0;
817
818 X_LOCK (etplock);
819 self->dirp = dirp = opendir (req->ptr1);
820 self->dbuf = u = malloc (sizeof (*u));
821 req->flags |= EIO_FLAG_PTR2_FREE;
822 req->ptr2 = names = malloc (memlen);
823 X_UNLOCK (etplock);
824
825 if (dirp && u && names)
826 for (;;)
827 {
828 errno = 0;
829 readdir_r (dirp, &u->d, &entp);
830
831 if (!entp)
832 break;
833
834 name = entp->d_name;
835
836 if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
837 {
838 int len = strlen (name) + 1;
839
840 res++;
841
842 while (memofs + len > memlen)
843 {
844 memlen *= 2;
845 X_LOCK (etplock);
846 req->ptr2 = names = realloc (names, memlen);
847 X_UNLOCK (etplock);
848
849 if (!names)
850 break;
851 }
852
853 memcpy (names + memofs, name, len);
854 memofs += len;
855 }
856 }
857
858 if (errno)
859 res = -1;
860
861 req->result = res;
862 }
863
864 /*****************************************************************************/
865
866 #define ALLOC(len) \
867 if (!req->ptr2) \
868 { \
869 X_LOCK (etplock); \
870 req->flags |= EIO_FLAG_PTR2_FREE; \
871 X_UNLOCK (etplock); \
872 req->ptr2 = malloc (len); \
873 if (!req->ptr2) \
874 { \
875 errno = ENOMEM; \
876 req->result = -1; \
877 break; \
878 } \
879 }
880
881 X_THREAD_PROC (eio_proc)
882 {
883 eio_req *req;
884 struct timespec ts;
885 worker *self = (worker *)thr_arg;
886
887 /* try to distribute timeouts somewhat randomly */
888 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
889
890 for (;;)
891 {
892 ts.tv_sec = time (0) + IDLE_TIMEOUT;
893
894 X_LOCK (reqlock);
895
896 for (;;)
897 {
898 self->req = req = reqq_shift (&req_queue);
899
900 if (req)
901 break;
902
903 ++idle;
904
905 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
906 {
907 if (idle > max_idle)
908 {
909 --idle;
910 X_UNLOCK (reqlock);
911 X_LOCK (etplock);
912 --started;
913 X_UNLOCK (etplock);
914 goto quit;
915 }
916
917 /* we are allowed to idle, so do so without any timeout */
918 X_COND_WAIT (reqwait, reqlock);
919 ts.tv_sec = time (0) + IDLE_TIMEOUT;
920 }
921
922 --idle;
923 }
924
925 --nready;
926
927 X_UNLOCK (reqlock);
928
929 errno = 0; /* strictly unnecessary */
930
931 if (!EIO_CANCELLED (req))
932 switch (req->type)
933 {
934 case EIO_READ: ALLOC (req->size);
935 req->result = req->offs >= 0
936 ? pread (req->int1, req->ptr2, req->size, req->offs)
937 : read (req->int1, req->ptr2, req->size); break;
938 case EIO_WRITE: req->result = req->offs >= 0
939 ? pwrite (req->int1, req->ptr2, req->size, req->offs)
940 : write (req->int1, req->ptr2, req->size); break;
941
942 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
943 case EIO_SENDFILE: req->result = eio__sendfile (req->int1, req->int2, req->offs, req->size, self); break;
944
945 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
946 req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
947 case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
948 req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
949 case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
950 req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
951
952 case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
953 case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
954 case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
955 case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
956 case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
957 case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
958
959 case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
960 case EIO_CLOSE: req->result = close (req->int1); break;
961 case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
962 case EIO_UNLINK: req->result = unlink (req->ptr1); break;
963 case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
964 case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
965 case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
966 case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
967 case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
968 case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
969
970 case EIO_READLINK: ALLOC (NAME_MAX);
971 req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
972
973 case EIO_SYNC: req->result = 0; sync (); break;
974 case EIO_FSYNC: req->result = fsync (req->int1); break;
975 case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
976
977 case EIO_READDIR: eio__scandir (req, self); break;
978
979 case EIO_BUSY:
980 #ifdef _WIN32
981 Sleep (req->nv1 * 1000.);
982 #else
983 {
984 struct timeval tv;
985
986 tv.tv_sec = req->nv1;
987 tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
988
989 req->result = select (0, 0, 0, 0, &tv);
990 }
991 #endif
992 break;
993
994 case EIO_UTIME:
995 case EIO_FUTIME:
996 {
997 struct timeval tv[2];
998 struct timeval *times;
999
1000 if (req->nv1 != -1. || req->nv2 != -1.)
1001 {
1002 tv[0].tv_sec = req->nv1;
1003 tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
1004 tv[1].tv_sec = req->nv2;
1005 tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
1006
1007 times = tv;
1008 }
1009 else
1010 times = 0;
1011
1012
1013 req->result = req->type == EIO_FUTIME
1014 ? futimes (req->int1, times)
1015 : utimes (req->ptr1, times);
1016 }
1017
1018 case EIO_GROUP:
1019 case EIO_NOP:
1020 req->result = 0;
1021 break;
1022
1023 case EIO_QUIT:
1024 goto quit;
1025
1026 default:
1027 req->result = -1;
1028 break;
1029 }
1030
1031 req->errorno = errno;
1032
1033 X_LOCK (reslock);
1034
1035 ++npending;
1036
1037 if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
1038 etp.want_poll_cb ();
1039
1040 self->req = 0;
1041 worker_clear (self);
1042
1043 X_UNLOCK (reslock);
1044 }
1045
1046 quit:
1047 X_LOCK (etplock);
1048 worker_free (self);
1049 X_UNLOCK (etplock);
1050
1051 return 0;
1052 }
1053
1054 /*****************************************************************************/
1055
1056 int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1057 {
1058 etp_init (&etp, want_poll, done_poll);
1059 }
1060
1061 static void eio_api_destroy (eio_req *req)
1062 {
1063 free (req);
1064 }
1065
1066 #define REQ(rtype) \
1067 eio_req *req; \
1068 \
1069 req = (eio_req *)calloc (1, sizeof *req); \
1070 if (!req) \
1071 return 0; \
1072 \
1073 req->type = rtype; \
1074 req->pri = pri; \
1075 req->finish = cb; \
1076 req->data = data; \
1077 req->destroy = eio_api_destroy;
1078
1079 #define SEND eio_submit (req); return req
1080
1081 #define PATH \
1082 req->flags |= EIO_FLAG_PTR1_FREE; \
1083 req->ptr1 = strdup (path); \
1084 if (!req->ptr1) \
1085 { \
1086 eio_api_destroy (req); \
1087 return 0; \
1088 }
1089
1090 #ifndef EIO_NO_WRAPPERS
1091
1092 eio_req *eio_nop (int pri, eio_cb cb, void *data)
1093 {
1094 REQ (EIO_NOP); SEND;
1095 }
1096
1097 eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data)
1098 {
1099 REQ (EIO_BUSY); req->nv1 = delay; SEND;
1100 }
1101
1102 eio_req *eio_sync (int pri, eio_cb cb, void *data)
1103 {
1104 REQ (EIO_SYNC); SEND;
1105 }
1106
1107 eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data)
1108 {
1109 REQ (EIO_FSYNC); req->int1 = fd; SEND;
1110 }
1111
1112 eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data)
1113 {
1114 REQ (EIO_FDATASYNC); req->int1 = fd; SEND;
1115 }
1116
1117 eio_req *eio_close (int fd, int pri, eio_cb cb, void *data)
1118 {
1119 REQ (EIO_CLOSE); req->int1 = fd; SEND;
1120 }
1121
1122 eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data)
1123 {
1124 REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1125 }
1126
1127 eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1128 {
1129 REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1130 }
1131
1132 eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1133 {
1134 REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1135 }
1136
1137 eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data)
1138 {
1139 REQ (EIO_FSTAT); req->int1 = fd; SEND;
1140 }
1141
1142 eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data)
1143 {
1144 REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1145 }
1146
1147 eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data)
1148 {
1149 REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1150 }
1151
1152 eio_req *eio_fchmod (int fd, mode_t mode, int pri, eio_cb cb, void *data)
1153 {
1154 REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1155 }
1156
1157 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1158 {
1159 REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1160 }
1161
1162 eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data)
1163 {
1164 REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1165 }
1166
1167 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data)
1168 {
1169 REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1170 }
1171
1172 eio_req *eio_open (const char *path, int flags, mode_t mode, int pri, eio_cb cb, void *data)
1173 {
1174 REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1175 }
1176
1177 eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data)
1178 {
1179 REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1180 }
1181
1182 eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data)
1183 {
1184 REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND;
1185 }
1186
1187 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1188 {
1189 REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1190 }
1191
1192 eio_req *eio_chmod (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1193 {
1194 REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND;
1195 }
1196
1197 eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1198 {
1199 REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
1200 }
1201
1202 static eio_req *
1203 eio__1path (int type, const char *path, int pri, eio_cb cb, void *data)
1204 {
1205 REQ (type); PATH; SEND;
1206 }
1207
1208 eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data)
1209 {
1210 return eio__1path (EIO_READLINK, path, pri, cb, data);
1211 }
1212
1213 eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data)
1214 {
1215 return eio__1path (EIO_STAT, path, pri, cb, data);
1216 }
1217
1218 eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data)
1219 {
1220 return eio__1path (EIO_LSTAT, path, pri, cb, data);
1221 }
1222
1223 eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data)
1224 {
1225 return eio__1path (EIO_UNLINK, path, pri, cb, data);
1226 }
1227
1228 eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data)
1229 {
1230 return eio__1path (EIO_RMDIR, path, pri, cb, data);
1231 }
1232
1233 eio_req *eio_readdir (const char *path, int pri, eio_cb cb, void *data)
1234 {
1235 return eio__1path (EIO_READDIR, path, pri, cb, data);
1236 }
1237
1238 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, int pri, eio_cb cb, void *data)
1239 {
1240 REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1241 }
1242
1243 static eio_req *
1244 eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1245 {
1246 REQ (type); PATH;
1247
1248 req->flags |= EIO_FLAG_PTR2_FREE;
1249 req->ptr2 = strdup (new_path);
1250 if (!req->ptr2)
1251 {
1252 eio_api_destroy (req);
1253 return 0;
1254 }
1255
1256 SEND;
1257 }
1258
1259 eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1260 {
1261 return eio__2path (EIO_LINK, path, new_path, pri, cb, data);
1262 }
1263
1264 eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1265 {
1266 return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data);
1267 }
1268
1269 eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1270 {
1271 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1272 }
1273
1274 #endif
1275
1276 eio_req *eio_grp (eio_cb cb, void *data)
1277 {
1278 const int pri = EIO_PRI_MAX;
1279
1280 REQ (EIO_GROUP); SEND;
1281 }
1282
1283 #undef REQ
1284 #undef PATH
1285 #undef SEND
1286
1287 /*****************************************************************************/
1288 /* grp functions */
1289
1290 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1291 {
1292 grp->int2 = limit;
1293 grp->feed = feed;
1294
1295 grp_try_feed (grp);
1296 }
1297
1298 void eio_grp_limit (eio_req *grp, int limit)
1299 {
1300 grp->int2 = limit;
1301
1302 grp_try_feed (grp);
1303 }
1304
1305 void eio_grp_add (eio_req *grp, eio_req *req)
1306 {
1307 assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1308
1309 ++grp->size;
1310 req->grp = grp;
1311
1312 req->grp_prev = 0;
1313 req->grp_next = grp->grp_first;
1314
1315 if (grp->grp_first)
1316 grp->grp_first->grp_prev = req;
1317
1318 grp->grp_first = req;
1319 }
1320
1321 /*****************************************************************************/
1322 /* misc garbage */
1323
1324 ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
1325 {
1326 worker wrk;
1327
1328 wrk.dbuf = 0;
1329
1330 eio__sendfile (ofd, ifd, offset, count, &wrk);
1331
1332 if (wrk.dbuf)
1333 free (wrk.dbuf);
1334 }
1335