ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.12
Committed: Tue May 13 18:54:52 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.11: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "eio.h"
2 #include "xthread.h"
3
4 #include <errno.h>
5 #include <stddef.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <limits.h>
12 #include <fcntl.h>
13 #include <assert.h>
14
15 #ifndef EIO_FINISH
16 # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17 #endif
18
19 #ifndef EIO_DESTROY
20 # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21 #endif
22
23 #ifndef EIO_FEED
24 # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25 #endif
26
27 #ifdef _WIN32
28
29 /*doh*/
30
31 #else
32
33 # include "config.h"
34 # include <sys/time.h>
35 # include <sys/select.h>
36 # include <unistd.h>
37 # include <utime.h>
38 # include <signal.h>
39 # include <dirent.h>
40
41 # ifndef EIO_STRUCT_DIRENT
42 # define EIO_STRUCT_DIRENT struct dirent
43 # endif
44
45 #endif
46
47 #if HAVE_SENDFILE
48 # if __linux
49 # include <sys/sendfile.h>
50 # elif __freebsd
51 # include <sys/socket.h>
52 # include <sys/uio.h>
53 # elif __hpux
54 # include <sys/socket.h>
55 # elif __solaris /* not yet */
56 # include <sys/sendfile.h>
57 # else
58 # error sendfile support requested but not available
59 # endif
60 #endif
61
62 /* number of seconds after which an idle threads exit */
63 #define IDLE_TIMEOUT 10
64
65 /* used for struct dirent, AIX doesn't provide it */
66 #ifndef NAME_MAX
67 # define NAME_MAX 4096
68 #endif
69
70 /* buffer size for various temporary buffers */
71 #define EIO_BUFSIZE 65536
72
73 #define dBUF \
74 char *eio_buf; \
75 X_LOCK (wrklock); \
76 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77 X_UNLOCK (wrklock); \
78 errno = ENOMEM; \
79 if (!eio_buf) \
80 return -1;
81
82 #define EIO_TICKS ((1000000 + 1023) >> 10)
83
84 static void (*want_poll_cb) (void);
85 static void (*done_poll_cb) (void);
86
87 static unsigned int max_poll_time = 0;
88 static unsigned int max_poll_reqs = 0;
89
90 /* calculcate time difference in ~1/EIO_TICKS of a second */
91 static int tvdiff (struct timeval *tv1, struct timeval *tv2)
92 {
93 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
94 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
95 }
96
97 static unsigned int started, idle, wanted = 4;
98
99 /* worker threads management */
100 static mutex_t wrklock = X_MUTEX_INIT;
101
102 typedef struct worker
103 {
104 /* locked by wrklock */
105 struct worker *prev, *next;
106
107 thread_t tid;
108
109 /* locked by reslock, reqlock or wrklock */
110 eio_req *req; /* currently processed request */
111 void *dbuf;
112 DIR *dirp;
113 } worker;
114
115 static worker wrk_first = { &wrk_first, &wrk_first, 0 };
116
117 static void worker_clear (worker *wrk)
118 {
119 if (wrk->dirp)
120 {
121 closedir (wrk->dirp);
122 wrk->dirp = 0;
123 }
124
125 if (wrk->dbuf)
126 {
127 free (wrk->dbuf);
128 wrk->dbuf = 0;
129 }
130 }
131
132 static void worker_free (worker *wrk)
133 {
134 wrk->next->prev = wrk->prev;
135 wrk->prev->next = wrk->next;
136
137 free (wrk);
138 }
139
140 static volatile unsigned int nreqs, nready, npending;
141 static volatile unsigned int max_idle = 4;
142
143 static mutex_t reslock = X_MUTEX_INIT;
144 static mutex_t reqlock = X_MUTEX_INIT;
145 static cond_t reqwait = X_COND_INIT;
146
147 unsigned int eio_nreqs (void)
148 {
149 return nreqs;
150 }
151
152 unsigned int eio_nready (void)
153 {
154 unsigned int retval;
155
156 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
157 retval = nready;
158 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
159
160 return retval;
161 }
162
163 unsigned int eio_npending (void)
164 {
165 unsigned int retval;
166
167 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
168 retval = npending;
169 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
170
171 return retval;
172 }
173
174 unsigned int eio_nthreads (void)
175 {
176 unsigned int retval;
177
178 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
179 retval = started;
180 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
181
182 return retval;
183 }
184
185 /*
186 * a somewhat faster data structure might be nice, but
187 * with 8 priorities this actually needs <20 insns
188 * per shift, the most expensive operation.
189 */
190 typedef struct {
191 eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
192 int size;
193 } reqq;
194
195 static reqq req_queue;
196 static reqq res_queue;
197
198 static int reqq_push (reqq *q, eio_req *req)
199 {
200 int pri = req->pri;
201 req->next = 0;
202
203 if (q->qe[pri])
204 {
205 q->qe[pri]->next = req;
206 q->qe[pri] = req;
207 }
208 else
209 q->qe[pri] = q->qs[pri] = req;
210
211 return q->size++;
212 }
213
214 static eio_req *reqq_shift (reqq *q)
215 {
216 int pri;
217
218 if (!q->size)
219 return 0;
220
221 --q->size;
222
223 for (pri = EIO_NUM_PRI; pri--; )
224 {
225 eio_req *req = q->qs[pri];
226
227 if (req)
228 {
229 if (!(q->qs[pri] = (eio_req *)req->next))
230 q->qe[pri] = 0;
231
232 return req;
233 }
234 }
235
236 abort ();
237 }
238
239 static void grp_try_feed (eio_req *grp)
240 {
241 while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
242 {
243 int old_len = grp->size;
244
245 EIO_FEED (grp);
246
247 /* stop if no progress has been made */
248 if (old_len == grp->size)
249 {
250 grp->feed = 0;
251 break;
252 }
253 }
254 }
255
256 static int eio_finish (eio_req *req);
257
258 static int grp_dec (eio_req *grp)
259 {
260 --grp->size;
261
262 /* call feeder, if applicable */
263 grp_try_feed (grp);
264
265 /* finish, if done */
266 if (!grp->size && grp->int1)
267 return eio_finish (grp);
268 else
269 return 0;
270 }
271
272 void eio_destroy (eio_req *req)
273 {
274 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
275 if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
276
277 EIO_DESTROY (req);
278 }
279
280 static int eio_finish (eio_req *req)
281 {
282 int res = EIO_FINISH (req);
283
284 if (req->grp)
285 {
286 int res2;
287 eio_req *grp = req->grp;
288
289 /* unlink request */
290 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
291 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
292
293 if (grp->grp_first == req)
294 grp->grp_first = req->grp_next;
295
296 res2 = grp_dec (grp);
297
298 if (!res && res2)
299 res = res2;
300 }
301
302 eio_destroy (req);
303
304 return res;
305 }
306
307 void eio_grp_cancel (eio_req *grp)
308 {
309 for (grp = grp->grp_first; grp; grp = grp->grp_next)
310 eio_cancel (grp);
311 }
312
313 void eio_cancel (eio_req *req)
314 {
315 X_LOCK (wrklock);
316 req->flags |= EIO_FLAG_CANCELLED;
317 X_UNLOCK (wrklock);
318
319 eio_grp_cancel (req);
320 }
321
322 X_THREAD_PROC (eio_proc);
323
324 static void start_thread (void)
325 {
326 worker *wrk = calloc (1, sizeof (worker));
327
328 /*TODO*/
329 assert (("unable to allocate worker thread data", wrk));
330
331 X_LOCK (wrklock);
332
333 if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
334 {
335 wrk->prev = &wrk_first;
336 wrk->next = wrk_first.next;
337 wrk_first.next->prev = wrk;
338 wrk_first.next = wrk;
339 ++started;
340 }
341 else
342 free (wrk);
343
344 X_UNLOCK (wrklock);
345 }
346
347 static void maybe_start_thread (void)
348 {
349 if (eio_nthreads () >= wanted)
350 return;
351
352 /* todo: maybe use idle here, but might be less exact */
353 if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
354 return;
355
356 start_thread ();
357 }
358
359 void eio_submit (eio_req *req)
360 {
361 req->pri += EIO_PRI_BIAS;
362
363 if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS;
364 if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
365
366 ++nreqs;
367
368 X_LOCK (reqlock);
369 ++nready;
370 reqq_push (&req_queue, req);
371 X_COND_SIGNAL (reqwait);
372 X_UNLOCK (reqlock);
373
374 maybe_start_thread ();
375 }
376
377 static void end_thread (void)
378 {
379 eio_req *req = calloc (1, sizeof (eio_req));
380
381 req->type = EIO_QUIT;
382 req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
383
384 X_LOCK (reqlock);
385 reqq_push (&req_queue, req);
386 X_COND_SIGNAL (reqwait);
387 X_UNLOCK (reqlock);
388
389 X_LOCK (wrklock);
390 --started;
391 X_UNLOCK (wrklock);
392 }
393
394 void eio_set_max_poll_time (double nseconds)
395 {
396 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
397 max_poll_time = nseconds;
398 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
399 }
400
401 void eio_set_max_poll_reqs (unsigned int maxreqs)
402 {
403 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
404 max_poll_reqs = maxreqs;
405 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
406 }
407
408 void eio_set_max_idle (unsigned int nthreads)
409 {
410 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
411 max_idle = nthreads <= 0 ? 1 : nthreads;
412 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
413 }
414
415 void eio_set_min_parallel (unsigned int nthreads)
416 {
417 if (wanted < nthreads)
418 wanted = nthreads;
419 }
420
421 void eio_set_max_parallel (unsigned int nthreads)
422 {
423 if (wanted > nthreads)
424 wanted = nthreads;
425
426 while (started > wanted)
427 end_thread ();
428 }
429
430 int eio_poll (void)
431 {
432 int maxreqs = max_poll_reqs;
433 struct timeval tv_start, tv_now;
434 eio_req *req;
435
436 if (max_poll_time)
437 gettimeofday (&tv_start, 0);
438
439 for (;;)
440 {
441 maybe_start_thread ();
442
443 X_LOCK (reslock);
444 req = reqq_shift (&res_queue);
445
446 if (req)
447 {
448 --npending;
449
450 if (!res_queue.size && done_poll_cb)
451 done_poll_cb ();
452 }
453
454 X_UNLOCK (reslock);
455
456 if (!req)
457 return 0;
458
459 --nreqs;
460
461 if (req->type == EIO_GROUP && req->size)
462 {
463 req->int1 = 1; /* mark request as delayed */
464 continue;
465 }
466 else
467 {
468 int res = eio_finish (req);
469 if (res)
470 return res;
471 }
472
473 if (maxreqs && !--maxreqs)
474 break;
475
476 if (max_poll_time)
477 {
478 gettimeofday (&tv_now, 0);
479
480 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
481 break;
482 }
483 }
484
485 errno = EAGAIN;
486 return -1;
487 }
488
489 /*****************************************************************************/
490 /* work around various missing functions */
491
492 #if !HAVE_PREADWRITE
493 # define pread eio__pread
494 # define pwrite eio__pwrite
495
496 /*
497 * make our pread/pwrite safe against themselves, but not against
498 * normal read/write by using a mutex. slows down execution a lot,
499 * but that's your problem, not mine.
500 */
501 static mutex_t preadwritelock = X_MUTEX_INIT;
502
503 static ssize_t
504 eio__pread (int fd, void *buf, size_t count, off_t offset)
505 {
506 ssize_t res;
507 off_t ooffset;
508
509 X_LOCK (preadwritelock);
510 ooffset = lseek (fd, 0, SEEK_CUR);
511 lseek (fd, offset, SEEK_SET);
512 res = read (fd, buf, count);
513 lseek (fd, ooffset, SEEK_SET);
514 X_UNLOCK (preadwritelock);
515
516 return res;
517 }
518
519 static ssize_t
520 eio__pwrite (int fd, void *buf, size_t count, off_t offset)
521 {
522 ssize_t res;
523 off_t ooffset;
524
525 X_LOCK (preadwritelock);
526 ooffset = lseek (fd, 0, SEEK_CUR);
527 lseek (fd, offset, SEEK_SET);
528 res = write (fd, buf, count);
529 lseek (fd, offset, SEEK_SET);
530 X_UNLOCK (preadwritelock);
531
532 return res;
533 }
534 #endif
535
536 #ifndef HAVE_FUTIMES
537
538 # define utimes(path,times) eio__utimes (path, times)
539 # define futimes(fd,times) eio__futimes (fd, times)
540
541 static int
542 eio__utimes (const char *filename, const struct timeval times[2])
543 {
544 if (times)
545 {
546 struct utimbuf buf;
547
548 buf.actime = times[0].tv_sec;
549 buf.modtime = times[1].tv_sec;
550
551 return utime (filename, &buf);
552 }
553 else
554 return utime (filename, 0);
555 }
556
557 static int eio__futimes (int fd, const struct timeval tv[2])
558 {
559 errno = ENOSYS;
560 return -1;
561 }
562
563 #endif
564
565 #if !HAVE_FDATASYNC
566 # define fdatasync fsync
567 #endif
568
569 #if !HAVE_READAHEAD
570 # define readahead(fd,offset,count) eio__readahead (fd, offset, count, self)
571
572 static ssize_t
573 eio__readahead (int fd, off_t offset, size_t count, worker *self)
574 {
575 size_t todo = count;
576 dBUF;
577
578 while (todo > 0)
579 {
580 size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
581
582 pread (fd, eio_buf, len, offset);
583 offset += len;
584 todo -= len;
585 }
586
587 errno = 0;
588 return count;
589 }
590
591 #endif
592
593 #if !HAVE_READDIR_R
594 # define readdir_r eio__readdir_r
595
596 static mutex_t readdirlock = X_MUTEX_INIT;
597
598 static int
599 eio__readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
600 {
601 EIO_STRUCT_DIRENT *e;
602 int errorno;
603
604 X_LOCK (readdirlock);
605
606 e = readdir (dirp);
607 errorno = errno;
608
609 if (e)
610 {
611 *res = ent;
612 strcpy (ent->d_name, e->d_name);
613 }
614 else
615 *res = 0;
616
617 X_UNLOCK (readdirlock);
618
619 errno = errorno;
620 return e ? 0 : -1;
621 }
622 #endif
623
624 /* sendfile always needs emulation */
625 static ssize_t
626 eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
627 {
628 ssize_t res;
629
630 if (!count)
631 return 0;
632
633 #if HAVE_SENDFILE
634 # if __linux
635 res = sendfile (ofd, ifd, &offset, count);
636
637 # elif __freebsd
638 /*
639 * Of course, the freebsd sendfile is a dire hack with no thoughts
640 * wasted on making it similar to other I/O functions.
641 */
642 {
643 off_t sbytes;
644 res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
645
646 if (res < 0 && sbytes)
647 /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
648 res = sbytes;
649 }
650
651 # elif __hpux
652 res = sendfile (ofd, ifd, offset, count, 0, 0);
653
654 # elif __solaris
655 {
656 struct sendfilevec vec;
657 size_t sbytes;
658
659 vec.sfv_fd = ifd;
660 vec.sfv_flag = 0;
661 vec.sfv_off = offset;
662 vec.sfv_len = count;
663
664 res = sendfilev (ofd, &vec, 1, &sbytes);
665
666 if (res < 0 && sbytes)
667 res = sbytes;
668 }
669
670 # endif
671 #else
672 res = -1;
673 errno = ENOSYS;
674 #endif
675
676 if (res < 0
677 && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
678 #if __solaris
679 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
680 #endif
681 )
682 )
683 {
684 /* emulate sendfile. this is a major pain in the ass */
685 dBUF;
686
687 res = 0;
688
689 while (count)
690 {
691 ssize_t cnt;
692
693 cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
694
695 if (cnt <= 0)
696 {
697 if (cnt && !res) res = -1;
698 break;
699 }
700
701 cnt = write (ofd, eio_buf, cnt);
702
703 if (cnt <= 0)
704 {
705 if (cnt && !res) res = -1;
706 break;
707 }
708
709 offset += cnt;
710 res += cnt;
711 count -= cnt;
712 }
713 }
714
715 return res;
716 }
717
718 /* read a full directory */
719 static void
720 eio__scandir (eio_req *req, worker *self)
721 {
722 DIR *dirp;
723 union
724 {
725 EIO_STRUCT_DIRENT d;
726 char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
727 } *u;
728 EIO_STRUCT_DIRENT *entp;
729 char *name, *names;
730 int memlen = 4096;
731 int memofs = 0;
732 int res = 0;
733
734 X_LOCK (wrklock);
735 self->dirp = dirp = opendir (req->ptr1);
736 self->dbuf = u = malloc (sizeof (*u));
737 req->flags |= EIO_FLAG_PTR2_FREE;
738 req->ptr2 = names = malloc (memlen);
739 X_UNLOCK (wrklock);
740
741 if (dirp && u && names)
742 for (;;)
743 {
744 errno = 0;
745 readdir_r (dirp, &u->d, &entp);
746
747 if (!entp)
748 break;
749
750 name = entp->d_name;
751
752 if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
753 {
754 int len = strlen (name) + 1;
755
756 res++;
757
758 while (memofs + len > memlen)
759 {
760 memlen *= 2;
761 X_LOCK (wrklock);
762 req->ptr2 = names = realloc (names, memlen);
763 X_UNLOCK (wrklock);
764
765 if (!names)
766 break;
767 }
768
769 memcpy (names + memofs, name, len);
770 memofs += len;
771 }
772 }
773
774 if (errno)
775 res = -1;
776
777 req->result = res;
778 }
779
780 /*****************************************************************************/
781
782 #define ALLOC(len) \
783 if (!req->ptr2) \
784 { \
785 X_LOCK (wrklock); \
786 req->flags |= EIO_FLAG_PTR2_FREE; \
787 X_UNLOCK (wrklock); \
788 req->ptr2 = malloc (len); \
789 if (!req->ptr2) \
790 { \
791 errno = ENOMEM; \
792 req->result = -1; \
793 break; \
794 } \
795 }
796
797 X_THREAD_PROC (eio_proc)
798 {
799 eio_req *req;
800 struct timespec ts;
801 worker *self = (worker *)thr_arg;
802
803 /* try to distribute timeouts somewhat randomly */
804 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
805
806 for (;;)
807 {
808 ts.tv_sec = time (0) + IDLE_TIMEOUT;
809
810 X_LOCK (reqlock);
811
812 for (;;)
813 {
814 self->req = req = reqq_shift (&req_queue);
815
816 if (req)
817 break;
818
819 ++idle;
820
821 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
822 {
823 if (idle > max_idle)
824 {
825 --idle;
826 X_UNLOCK (reqlock);
827 X_LOCK (wrklock);
828 --started;
829 X_UNLOCK (wrklock);
830 goto quit;
831 }
832
833 /* we are allowed to idle, so do so without any timeout */
834 X_COND_WAIT (reqwait, reqlock);
835 ts.tv_sec = time (0) + IDLE_TIMEOUT;
836 }
837
838 --idle;
839 }
840
841 --nready;
842
843 X_UNLOCK (reqlock);
844
845 errno = 0; /* strictly unnecessary */
846
847 if (!EIO_CANCELLED (req))
848 switch (req->type)
849 {
850 case EIO_READ: ALLOC (req->size);
851 req->result = req->offs >= 0
852 ? pread (req->int1, req->ptr2, req->size, req->offs)
853 : read (req->int1, req->ptr2, req->size); break;
854 case EIO_WRITE: req->result = req->offs >= 0
855 ? pwrite (req->int1, req->ptr2, req->size, req->offs)
856 : write (req->int1, req->ptr2, req->size); break;
857
858 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
859 case EIO_SENDFILE: req->result = eio__sendfile (req->int1, req->int2, req->offs, req->size, self); break;
860
861 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
862 req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
863 case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
864 req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
865 case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
866 req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
867
868 case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
869 case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
870 case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
871 case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
872 case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
873 case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
874
875 case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
876 case EIO_CLOSE: req->result = close (req->int1); break;
877 case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
878 case EIO_UNLINK: req->result = unlink (req->ptr1); break;
879 case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
880 case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
881 case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
882 case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
883 case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
884 case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
885
886 case EIO_READLINK: ALLOC (NAME_MAX);
887 req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
888
889 case EIO_SYNC: req->result = 0; sync (); break;
890 case EIO_FSYNC: req->result = fsync (req->int1); break;
891 case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
892
893 case EIO_READDIR: eio__scandir (req, self); break;
894
895 case EIO_BUSY:
896 #ifdef _WIN32
897 Sleep (req->nv1 * 1000.);
898 #else
899 {
900 struct timeval tv;
901
902 tv.tv_sec = req->nv1;
903 tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
904
905 req->result = select (0, 0, 0, 0, &tv);
906 }
907 #endif
908 break;
909
910 case EIO_UTIME:
911 case EIO_FUTIME:
912 {
913 struct timeval tv[2];
914 struct timeval *times;
915
916 if (req->nv1 != -1. || req->nv2 != -1.)
917 {
918 tv[0].tv_sec = req->nv1;
919 tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
920 tv[1].tv_sec = req->nv2;
921 tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
922
923 times = tv;
924 }
925 else
926 times = 0;
927
928
929 req->result = req->type == EIO_FUTIME
930 ? futimes (req->int1, times)
931 : utimes (req->ptr1, times);
932 }
933
934 case EIO_GROUP:
935 case EIO_NOP:
936 req->result = 0;
937 break;
938
939 case EIO_QUIT:
940 goto quit;
941
942 default:
943 req->result = -1;
944 break;
945 }
946
947 req->errorno = errno;
948
949 X_LOCK (reslock);
950
951 ++npending;
952
953 if (!reqq_push (&res_queue, req) && want_poll_cb)
954 want_poll_cb ();
955
956 self->req = 0;
957 worker_clear (self);
958
959 X_UNLOCK (reslock);
960 }
961
962 quit:
963 X_LOCK (wrklock);
964 worker_free (self);
965 X_UNLOCK (wrklock);
966
967 return 0;
968 }
969
970 /*****************************************************************************/
971
972 static void eio_atfork_prepare (void)
973 {
974 X_LOCK (wrklock);
975 X_LOCK (reqlock);
976 X_LOCK (reslock);
977 #if !HAVE_PREADWRITE
978 X_LOCK (preadwritelock);
979 #endif
980 #if !HAVE_READDIR_R
981 X_LOCK (readdirlock);
982 #endif
983 }
984
985 static void eio_atfork_parent (void)
986 {
987 #if !HAVE_READDIR_R
988 X_UNLOCK (readdirlock);
989 #endif
990 #if !HAVE_PREADWRITE
991 X_UNLOCK (preadwritelock);
992 #endif
993 X_UNLOCK (reslock);
994 X_UNLOCK (reqlock);
995 X_UNLOCK (wrklock);
996 }
997
998 static void eio_atfork_child (void)
999 {
1000 eio_req *prv;
1001
1002 while (prv = reqq_shift (&req_queue))
1003 eio_destroy (prv);
1004
1005 while (prv = reqq_shift (&res_queue))
1006 eio_destroy (prv);
1007
1008 while (wrk_first.next != &wrk_first)
1009 {
1010 worker *wrk = wrk_first.next;
1011
1012 if (wrk->req)
1013 eio_destroy (wrk->req);
1014
1015 worker_clear (wrk);
1016 worker_free (wrk);
1017 }
1018
1019 started = 0;
1020 idle = 0;
1021 nreqs = 0;
1022 nready = 0;
1023 npending = 0;
1024
1025 eio_atfork_parent ();
1026 }
1027
1028 int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1029 {
1030 want_poll_cb = want_poll;
1031 done_poll_cb = done_poll;
1032
1033 #ifdef _WIN32
1034 X_MUTEX_CHECK (wrklock);
1035 X_MUTEX_CHECK (reslock);
1036 X_MUTEX_CHECK (reqlock);
1037 X_MUTEX_CHECK (reqwait);
1038 X_MUTEX_CHECK (preadwritelock);
1039 X_MUTEX_CHECK (readdirlock);
1040
1041 X_COND_CHECK (reqwait);
1042 #endif
1043
1044 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1045 }
1046
1047 static void eio_api_destroy (eio_req *req)
1048 {
1049 free (req);
1050 }
1051
1052 #define REQ(rtype) \
1053 eio_req *req; \
1054 \
1055 req = (eio_req *)calloc (1, sizeof *req); \
1056 if (!req) \
1057 return 0; \
1058 \
1059 req->type = rtype; \
1060 req->pri = pri; \
1061 req->finish = cb; \
1062 req->data = data; \
1063 req->destroy = eio_api_destroy;
1064
1065 #define SEND eio_submit (req); return req
1066
1067 #define PATH \
1068 req->flags |= EIO_FLAG_PTR1_FREE; \
1069 req->ptr1 = strdup (path); \
1070 if (!req->ptr1) \
1071 { \
1072 eio_api_destroy (req); \
1073 return 0; \
1074 }
1075
1076 #ifndef EIO_NO_WRAPPERS
1077
1078 eio_req *eio_nop (int pri, eio_cb cb, void *data)
1079 {
1080 REQ (EIO_NOP); SEND;
1081 }
1082
1083 eio_req *eio_busy (double delay, int pri, eio_cb cb, void *data)
1084 {
1085 REQ (EIO_BUSY); req->nv1 = delay; SEND;
1086 }
1087
1088 eio_req *eio_sync (int pri, eio_cb cb, void *data)
1089 {
1090 REQ (EIO_SYNC); SEND;
1091 }
1092
1093 eio_req *eio_fsync (int fd, int pri, eio_cb cb, void *data)
1094 {
1095 REQ (EIO_FSYNC); req->int1 = fd; SEND;
1096 }
1097
1098 eio_req *eio_fdatasync (int fd, int pri, eio_cb cb, void *data)
1099 {
1100 REQ (EIO_FDATASYNC); req->int1 = fd; SEND;
1101 }
1102
1103 eio_req *eio_close (int fd, int pri, eio_cb cb, void *data)
1104 {
1105 REQ (EIO_CLOSE); req->int1 = fd; SEND;
1106 }
1107
1108 eio_req *eio_readahead (int fd, off_t offset, size_t length, int pri, eio_cb cb, void *data)
1109 {
1110 REQ (EIO_READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1111 }
1112
1113 eio_req *eio_read (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1114 {
1115 REQ (EIO_READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1116 }
1117
1118 eio_req *eio_write (int fd, void *buf, size_t length, off_t offset, int pri, eio_cb cb, void *data)
1119 {
1120 REQ (EIO_WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = buf; SEND;
1121 }
1122
1123 eio_req *eio_fstat (int fd, int pri, eio_cb cb, void *data)
1124 {
1125 REQ (EIO_FSTAT); req->int1 = fd; SEND;
1126 }
1127
1128 eio_req *eio_futime (int fd, double atime, double mtime, int pri, eio_cb cb, void *data)
1129 {
1130 REQ (EIO_FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1131 }
1132
1133 eio_req *eio_ftruncate (int fd, off_t offset, int pri, eio_cb cb, void *data)
1134 {
1135 REQ (EIO_FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1136 }
1137
1138 eio_req *eio_fchmod (int fd, mode_t mode, int pri, eio_cb cb, void *data)
1139 {
1140 REQ (EIO_FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1141 }
1142
1143 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1144 {
1145 REQ (EIO_FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1146 }
1147
1148 eio_req *eio_dup2 (int fd, int fd2, int pri, eio_cb cb, void *data)
1149 {
1150 REQ (EIO_DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1151 }
1152
1153 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, int pri, eio_cb cb, void *data)
1154 {
1155 REQ (EIO_SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1156 }
1157
1158 eio_req *eio_open (const char *path, int flags, mode_t mode, int pri, eio_cb cb, void *data)
1159 {
1160 REQ (EIO_OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1161 }
1162
1163 eio_req *eio_utime (const char *path, double atime, double mtime, int pri, eio_cb cb, void *data)
1164 {
1165 REQ (EIO_UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1166 }
1167
1168 eio_req *eio_truncate (const char *path, off_t offset, int pri, eio_cb cb, void *data)
1169 {
1170 REQ (EIO_TRUNCATE); PATH; req->offs = offset; SEND;
1171 }
1172
1173 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, int pri, eio_cb cb, void *data)
1174 {
1175 REQ (EIO_CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1176 }
1177
1178 eio_req *eio_chmod (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1179 {
1180 REQ (EIO_CHMOD); PATH; req->int2 = (long)mode; SEND;
1181 }
1182
1183 eio_req *eio_mkdir (const char *path, mode_t mode, int pri, eio_cb cb, void *data)
1184 {
1185 REQ (EIO_MKDIR); PATH; req->int2 = (long)mode; SEND;
1186 }
1187
1188 static eio_req *
1189 eio__1path (int type, const char *path, int pri, eio_cb cb, void *data)
1190 {
1191 REQ (type); PATH; SEND;
1192 }
1193
1194 eio_req *eio_readlink (const char *path, int pri, eio_cb cb, void *data)
1195 {
1196 return eio__1path (EIO_READLINK, path, pri, cb, data);
1197 }
1198
1199 eio_req *eio_stat (const char *path, int pri, eio_cb cb, void *data)
1200 {
1201 return eio__1path (EIO_STAT, path, pri, cb, data);
1202 }
1203
1204 eio_req *eio_lstat (const char *path, int pri, eio_cb cb, void *data)
1205 {
1206 return eio__1path (EIO_LSTAT, path, pri, cb, data);
1207 }
1208
1209 eio_req *eio_unlink (const char *path, int pri, eio_cb cb, void *data)
1210 {
1211 return eio__1path (EIO_UNLINK, path, pri, cb, data);
1212 }
1213
1214 eio_req *eio_rmdir (const char *path, int pri, eio_cb cb, void *data)
1215 {
1216 return eio__1path (EIO_RMDIR, path, pri, cb, data);
1217 }
1218
1219 eio_req *eio_readdir (const char *path, int pri, eio_cb cb, void *data)
1220 {
1221 return eio__1path (EIO_READDIR, path, pri, cb, data);
1222 }
1223
1224 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, int pri, eio_cb cb, void *data)
1225 {
1226 REQ (EIO_MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1227 }
1228
1229 static eio_req *
1230 eio__2path (int type, const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1231 {
1232 REQ (type); PATH;
1233
1234 req->flags |= EIO_FLAG_PTR2_FREE;
1235 req->ptr2 = strdup (new_path);
1236 if (!req->ptr2)
1237 {
1238 eio_api_destroy (req);
1239 return 0;
1240 }
1241
1242 SEND;
1243 }
1244
1245 eio_req *eio_link (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1246 {
1247 return eio__2path (EIO_LINK, path, new_path, pri, cb, data);
1248 }
1249
1250 eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1251 {
1252 return eio__2path (EIO_SYMLINK, path, new_path, pri, cb, data);
1253 }
1254
1255 eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1256 {
1257 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1258 }
1259
1260 #endif
1261
1262 eio_req *eio_grp (eio_cb cb, void *data)
1263 {
1264 const int pri = EIO_PRI_MAX;
1265
1266 REQ (EIO_GROUP); SEND;
1267 }
1268
1269 #undef REQ
1270 #undef PATH
1271 #undef SEND
1272
1273 /*****************************************************************************/
1274 /* grp functions */
1275
1276 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1277 {
1278 grp->int2 = limit;
1279 grp->feed = feed;
1280
1281 grp_try_feed (grp);
1282 }
1283
1284 void eio_grp_limit (eio_req *grp, int limit)
1285 {
1286 grp->int2 = limit;
1287
1288 grp_try_feed (grp);
1289 }
1290
1291 void eio_grp_add (eio_req *grp, eio_req *req)
1292 {
1293 assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1294
1295 ++grp->size;
1296 req->grp = grp;
1297
1298 req->grp_prev = 0;
1299 req->grp_next = grp->grp_first;
1300
1301 if (grp->grp_first)
1302 grp->grp_first->grp_prev = req;
1303
1304 grp->grp_first = req;
1305 }
1306
1307 /*****************************************************************************/
1308 /* misc garbage */
1309
1310 ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
1311 {
1312 worker wrk;
1313
1314 wrk.dbuf = 0;
1315
1316 eio__sendfile (ofd, ifd, offset, count, &wrk);
1317
1318 if (wrk.dbuf)
1319 free (wrk.dbuf);
1320 }
1321