ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.4
Committed: Sun May 11 00:10:15 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.3: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "eio.h"
2 #include "xthread.h"
3
4 #include <errno.h>
5 #include <stddef.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <limits.h>
12 #include <fcntl.h>
13 #include <sched.h>
14
15 #ifndef EIO_FINISH
16 # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17 #endif
18
19 #ifndef EIO_DESTROY
20 # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21 #endif
22
23 #ifndef EIO_FEED
24 # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25 #endif
26
27 #ifdef _WIN32
28
29 /*doh*/
30
31 #else
32
33 # include "config.h"
34 # include <sys/time.h>
35 # include <sys/select.h>
36 # include <unistd.h>
37 # include <utime.h>
38 # include <signal.h>
39 # include <dirent.h>
40
41 # ifndef EIO_STRUCT_DIRENT
42 # define EIO_STRUCT_DIRENT struct dirent
43 # endif
44
45 #endif
46
47 # ifndef EIO_STRUCT_STAT
48 # define EIO_STRUCT_STAT struct stat
49 # endif
50
51 #if HAVE_SENDFILE
52 # if __linux
53 # include <sys/sendfile.h>
54 # elif __freebsd
55 # include <sys/socket.h>
56 # include <sys/uio.h>
57 # elif __hpux
58 # include <sys/socket.h>
59 # elif __solaris /* not yet */
60 # include <sys/sendfile.h>
61 # else
62 # error sendfile support requested but not available
63 # endif
64 #endif
65
66 /* number of seconds after which an idle threads exit */
67 #define IDLE_TIMEOUT 10
68
69 /* used for struct dirent, AIX doesn't provide it */
70 #ifndef NAME_MAX
71 # define NAME_MAX 4096
72 #endif
73
74 /* buffer size for various temporary buffers */
75 #define EIO_BUFSIZE 65536
76
77 #define dBUF \
78 char *eio_buf; \
79 X_LOCK (wrklock); \
80 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
81 X_UNLOCK (wrklock); \
82 if (!eio_buf) \
83 return -1;
84
85 #define EIO_TICKS ((1000000 + 1023) >> 10)
86
87 static void (*want_poll_cb) (void);
88 static void (*done_poll_cb) (void);
89
90 static unsigned int max_poll_time = 0;
91 static unsigned int max_poll_reqs = 0;
92
93 /* calculcate time difference in ~1/EIO_TICKS of a second */
94 static int tvdiff (struct timeval *tv1, struct timeval *tv2)
95 {
96 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
97 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
98 }
99
100 static unsigned int started, idle, wanted;
101
102 /* worker threads management */
103 static mutex_t wrklock = X_MUTEX_INIT;
104
105 typedef struct worker {
106 /* locked by wrklock */
107 struct worker *prev, *next;
108
109 thread_t tid;
110
111 /* locked by reslock, reqlock or wrklock */
112 eio_req *req; /* currently processed request */
113 void *dbuf;
114 DIR *dirp;
115 } worker;
116
117 static worker wrk_first = { &wrk_first, &wrk_first, 0 };
118
119 static void worker_clear (worker *wrk)
120 {
121 if (wrk->dirp)
122 {
123 closedir (wrk->dirp);
124 wrk->dirp = 0;
125 }
126
127 if (wrk->dbuf)
128 {
129 free (wrk->dbuf);
130 wrk->dbuf = 0;
131 }
132 }
133
134 static void worker_free (worker *wrk)
135 {
136 wrk->next->prev = wrk->prev;
137 wrk->prev->next = wrk->next;
138
139 free (wrk);
140 }
141
142 static volatile unsigned int nreqs, nready, npending;
143 static volatile unsigned int max_idle = 4;
144
145 static mutex_t reslock = X_MUTEX_INIT;
146 static mutex_t reqlock = X_MUTEX_INIT;
147 static cond_t reqwait = X_COND_INIT;
148
149 unsigned int eio_nreqs (void)
150 {
151 return nreqs;
152 }
153
154 unsigned int eio_nready (void)
155 {
156 unsigned int retval;
157
158 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159 retval = nready;
160 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
161
162 return retval;
163 }
164
165 unsigned int eio_npending (void)
166 {
167 unsigned int retval;
168
169 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
170 retval = npending;
171 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
172
173 return retval;
174 }
175
176 unsigned int eio_nthreads (void)
177 {
178 unsigned int retval;
179
180 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
181 retval = started;
182 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
183
184 return retval;
185 }
186
187 /*
188 * a somewhat faster data structure might be nice, but
189 * with 8 priorities this actually needs <20 insns
190 * per shift, the most expensive operation.
191 */
192 typedef struct {
193 eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
194 int size;
195 } reqq;
196
197 static reqq req_queue;
198 static reqq res_queue;
199
200 static int reqq_push (reqq *q, eio_req *req)
201 {
202 int pri = req->pri;
203 req->next = 0;
204
205 if (q->qe[pri])
206 {
207 q->qe[pri]->next = req;
208 q->qe[pri] = req;
209 }
210 else
211 q->qe[pri] = q->qs[pri] = req;
212
213 return q->size++;
214 }
215
216 static eio_req *reqq_shift (reqq *q)
217 {
218 int pri;
219
220 if (!q->size)
221 return 0;
222
223 --q->size;
224
225 for (pri = EIO_NUM_PRI; pri--; )
226 {
227 eio_req *req = q->qs[pri];
228
229 if (req)
230 {
231 if (!(q->qs[pri] = (eio_req *)req->next))
232 q->qe[pri] = 0;
233
234 return req;
235 }
236 }
237
238 abort ();
239 }
240
241 static void grp_try_feed (eio_req *grp)
242 {
243 while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
244 {
245 int old_len = grp->size;
246
247 EIO_FEED (grp);
248
249 /* stop if no progress has been made */
250 if (old_len == grp->size)
251 {
252 grp->feed = 0;
253 break;
254 }
255 }
256 }
257
258 static int eio_finish (eio_req *req);
259
260 static int grp_dec (eio_req *grp)
261 {
262 --grp->size;
263
264 /* call feeder, if applicable */
265 grp_try_feed (grp);
266
267 /* finish, if done */
268 if (!grp->size && grp->int1)
269 return eio_finish (grp);
270 else
271 return 0;
272 }
273
274 void eio_destroy (eio_req *req)
275 {
276 if ((req)->flags & EIO_FLAG_PTR2_FREE)
277 free (req->ptr2);
278
279 EIO_DESTROY (req);
280 }
281
282 static int eio_finish (eio_req *req)
283 {
284 int res = EIO_FINISH (req);
285
286 if (req->grp)
287 {
288 int res2;
289 eio_req *grp = req->grp;
290
291 /* unlink request */
292 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
293 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
294
295 if (grp->grp_first == req)
296 grp->grp_first = req->grp_next;
297
298 res2 = grp_dec (grp);
299
300 if (!res && res2)
301 res = res2;
302 }
303
304 eio_destroy (req);
305
306 return res;
307 }
308
309 void eio_grp_cancel (eio_req *grp)
310 {
311 for (grp = grp->grp_first; grp; grp = grp->grp_next)
312 eio_cancel (grp);
313 }
314
315 void eio_cancel (eio_req *req)
316 {
317 req->flags |= EIO_FLAG_CANCELLED;
318
319 eio_grp_cancel (req);
320 }
321
322 X_THREAD_PROC (eio_proc);
323
324 static void start_thread (void)
325 {
326 worker *wrk = calloc (1, sizeof (worker));
327
328 if (!wrk)
329 croak ("unable to allocate worker thread data");
330
331 X_LOCK (wrklock);
332
333 if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
334 {
335 wrk->prev = &wrk_first;
336 wrk->next = wrk_first.next;
337 wrk_first.next->prev = wrk;
338 wrk_first.next = wrk;
339 ++started;
340 }
341 else
342 free (wrk);
343
344 X_UNLOCK (wrklock);
345 }
346
347 static void maybe_start_thread (void)
348 {
349 if (eio_nthreads () >= wanted)
350 return;
351
352 /* todo: maybe use idle here, but might be less exact */
353 if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
354 return;
355
356 start_thread ();
357 }
358
359 void eio_submit (eio_req *req)
360 {
361 ++nreqs;
362
363 X_LOCK (reqlock);
364 ++nready;
365 reqq_push (&req_queue, req);
366 X_COND_SIGNAL (reqwait);
367 X_UNLOCK (reqlock);
368
369 maybe_start_thread ();
370 }
371
372 static void end_thread (void)
373 {
374 eio_req *req = calloc (1, sizeof (eio_req));
375
376 req->type = EIO_QUIT;
377 req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
378
379 X_LOCK (reqlock);
380 reqq_push (&req_queue, req);
381 X_COND_SIGNAL (reqwait);
382 X_UNLOCK (reqlock);
383
384 X_LOCK (wrklock);
385 --started;
386 X_UNLOCK (wrklock);
387 }
388
389 void eio_set_max_poll_time (double nseconds)
390 {
391 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
392 max_poll_time = nseconds;
393 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
394 }
395
396 void eio_set_max_poll_reqs (unsigned int maxreqs)
397 {
398 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
399 max_poll_reqs = maxreqs;
400 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
401 }
402
403 void eio_set_max_idle (unsigned int nthreads)
404 {
405 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
406 max_idle = nthreads <= 0 ? 1 : nthreads;
407 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
408 }
409
410 void eio_set_min_parallel (unsigned int nthreads)
411 {
412 if (wanted < nthreads)
413 wanted = nthreads;
414 }
415
416 void eio_set_max_parallel (unsigned int nthreads)
417 {
418 if (wanted > nthreads)
419 wanted = nthreads;
420
421 while (started > wanted)
422 end_thread ();
423 }
424
425 int eio_poll (void)
426 {
427 int maxreqs = max_poll_reqs;
428 struct timeval tv_start, tv_now;
429 eio_req *req;
430
431 if (max_poll_time)
432 gettimeofday (&tv_start, 0);
433
434 for (;;)
435 {
436 maybe_start_thread ();
437
438 X_LOCK (reslock);
439 req = reqq_shift (&res_queue);
440
441 if (req)
442 {
443 --npending;
444
445 if (!res_queue.size && done_poll_cb)
446 done_poll_cb ();
447 }
448
449 X_UNLOCK (reslock);
450
451 if (!req)
452 return 0;
453
454 --nreqs;
455
456 if (req->type == EIO_GROUP && req->size)
457 {
458 req->int1 = 1; /* mark request as delayed */
459 continue;
460 }
461 else
462 {
463 int res = eio_finish (req);
464 if (res)
465 return res;
466 }
467
468 if (maxreqs && !--maxreqs)
469 break;
470
471 if (max_poll_time)
472 {
473 gettimeofday (&tv_now, 0);
474
475 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
476 break;
477 }
478 }
479
480 errno = EAGAIN;
481 return -1;
482 }
483
484 /*****************************************************************************/
485 /* work around various missing functions */
486
487 #if !HAVE_PREADWRITE
488 # define pread aio_pread
489 # define pwrite aio_pwrite
490
491 /*
492 * make our pread/pwrite safe against themselves, but not against
493 * normal read/write by using a mutex. slows down execution a lot,
494 * but that's your problem, not mine.
495 */
496 static mutex_t preadwritelock = X_MUTEX_INIT;
497
498 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
499 {
500 ssize_t res;
501 off_t ooffset;
502
503 X_LOCK (preadwritelock);
504 ooffset = lseek (fd, 0, SEEK_CUR);
505 lseek (fd, offset, SEEK_SET);
506 res = read (fd, buf, count);
507 lseek (fd, ooffset, SEEK_SET);
508 X_UNLOCK (preadwritelock);
509
510 return res;
511 }
512
513 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
514 {
515 ssize_t res;
516 off_t ooffset;
517
518 X_LOCK (preadwritelock);
519 ooffset = lseek (fd, 0, SEEK_CUR);
520 lseek (fd, offset, SEEK_SET);
521 res = write (fd, buf, count);
522 lseek (fd, offset, SEEK_SET);
523 X_UNLOCK (preadwritelock);
524
525 return res;
526 }
527 #endif
528
529 #ifndef HAVE_FUTIMES
530
531 # define utimes(path,times) aio_utimes (path, times)
532 # define futimes(fd,times) aio_futimes (fd, times)
533
534 static int aio_utimes (const char *filename, const struct timeval times[2])
535 {
536 if (times)
537 {
538 struct utimbuf buf;
539
540 buf.actime = times[0].tv_sec;
541 buf.modtime = times[1].tv_sec;
542
543 return utime (filename, &buf);
544 }
545 else
546 return utime (filename, 0);
547 }
548
549 static int aio_futimes (int fd, const struct timeval tv[2])
550 {
551 errno = ENOSYS;
552 return -1;
553 }
554
555 #endif
556
557 #if !HAVE_FDATASYNC
558 # define fdatasync fsync
559 #endif
560
561 #if !HAVE_READAHEAD
562 # define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
563
564 static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
565 {
566 size_t todo = count;
567 dBUF;
568
569 while (todo > 0)
570 {
571 size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
572
573 pread (fd, eio_buf, len, offset);
574 offset += len;
575 todo -= len;
576 }
577
578 errno = 0;
579 return count;
580 }
581
582 #endif
583
584 #if !HAVE_READDIR_R
585 # define readdir_r aio_readdir_r
586
587 static mutex_t readdirlock = X_MUTEX_INIT;
588
589 static int readdir_r (DIR *dirp, X_DIRENT *ent, X_DIRENT **res)
590 {
591 X_DIRENT *e;
592 int errorno;
593
594 X_LOCK (readdirlock);
595
596 e = readdir (dirp);
597 errorno = errno;
598
599 if (e)
600 {
601 *res = ent;
602 strcpy (ent->d_name, e->d_name);
603 }
604 else
605 *res = 0;
606
607 X_UNLOCK (readdirlock);
608
609 errno = errorno;
610 return e ? 0 : -1;
611 }
612 #endif
613
614 /* sendfile always needs emulation */
615 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count, worker *self)
616 {
617 ssize_t res;
618
619 if (!count)
620 return 0;
621
622 #if HAVE_SENDFILE
623 # if __linux
624 res = sendfile (ofd, ifd, &offset, count);
625
626 # elif __freebsd
627 /*
628 * Of course, the freebsd sendfile is a dire hack with no thoughts
629 * wasted on making it similar to other I/O functions.
630 */
631 {
632 off_t sbytes;
633 res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
634
635 if (res < 0 && sbytes)
636 /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
637 res = sbytes;
638 }
639
640 # elif __hpux
641 res = sendfile (ofd, ifd, offset, count, 0, 0);
642
643 # elif __solaris
644 {
645 struct sendfilevec vec;
646 size_t sbytes;
647
648 vec.sfv_fd = ifd;
649 vec.sfv_flag = 0;
650 vec.sfv_off = offset;
651 vec.sfv_len = count;
652
653 res = sendfilev (ofd, &vec, 1, &sbytes);
654
655 if (res < 0 && sbytes)
656 res = sbytes;
657 }
658
659 # endif
660 #else
661 res = -1;
662 errno = ENOSYS;
663 #endif
664
665 if (res < 0
666 && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
667 #if __solaris
668 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
669 #endif
670 )
671 )
672 {
673 /* emulate sendfile. this is a major pain in the ass */
674 dBUF;
675
676 res = 0;
677
678 while (count)
679 {
680 ssize_t cnt;
681
682 cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
683
684 if (cnt <= 0)
685 {
686 if (cnt && !res) res = -1;
687 break;
688 }
689
690 cnt = write (ofd, eio_buf, cnt);
691
692 if (cnt <= 0)
693 {
694 if (cnt && !res) res = -1;
695 break;
696 }
697
698 offset += cnt;
699 res += cnt;
700 count -= cnt;
701 }
702 }
703
704 return res;
705 }
706
707 /* read a full directory */
708 static void scandir_ (eio_req *req, worker *self)
709 {
710 DIR *dirp;
711 union
712 {
713 EIO_STRUCT_DIRENT d;
714 char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
715 } *u;
716 EIO_STRUCT_DIRENT *entp;
717 char *name, *names;
718 int memlen = 4096;
719 int memofs = 0;
720 int res = 0;
721
722 X_LOCK (wrklock);
723 self->dirp = dirp = opendir (req->ptr1);
724 self->dbuf = u = malloc (sizeof (*u));
725 req->flags |= EIO_FLAG_PTR2_FREE;
726 req->ptr2 = names = malloc (memlen);
727 X_UNLOCK (wrklock);
728
729 if (dirp && u && names)
730 for (;;)
731 {
732 errno = 0;
733 readdir_r (dirp, &u->d, &entp);
734
735 if (!entp)
736 break;
737
738 name = entp->d_name;
739
740 if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
741 {
742 int len = strlen (name) + 1;
743
744 res++;
745
746 while (memofs + len > memlen)
747 {
748 memlen *= 2;
749 X_LOCK (wrklock);
750 req->ptr2 = names = realloc (names, memlen);
751 X_UNLOCK (wrklock);
752
753 if (!names)
754 break;
755 }
756
757 memcpy (names + memofs, name, len);
758 memofs += len;
759 }
760 }
761
762 if (errno)
763 res = -1;
764
765 req->result = res;
766 }
767
768 /*****************************************************************************/
769
770 X_THREAD_PROC (eio_proc)
771 {
772 eio_req *req;
773 struct timespec ts;
774 worker *self = (worker *)thr_arg;
775
776 /* try to distribute timeouts somewhat randomly */
777 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
778
779 for (;;)
780 {
781 ts.tv_sec = time (0) + IDLE_TIMEOUT;
782
783 X_LOCK (reqlock);
784
785 for (;;)
786 {
787 self->req = req = reqq_shift (&req_queue);
788
789 if (req)
790 break;
791
792 ++idle;
793
794 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
795 {
796 if (idle > max_idle)
797 {
798 --idle;
799 X_UNLOCK (reqlock);
800 X_LOCK (wrklock);
801 --started;
802 X_UNLOCK (wrklock);
803 goto quit;
804 }
805
806 /* we are allowed to idle, so do so without any timeout */
807 X_COND_WAIT (reqwait, reqlock);
808 ts.tv_sec = time (0) + IDLE_TIMEOUT;
809 }
810
811 --idle;
812 }
813
814 --nready;
815
816 X_UNLOCK (reqlock);
817
818 errno = 0; /* strictly unnecessary */
819
820 if (!EIO_CANCELLED (req))
821 switch (req->type)
822 {
823 case EIO_READ: req->result = req->offs >= 0
824 ? pread (req->int1, req->ptr2, req->size, req->offs)
825 : read (req->int1, req->ptr2, req->size); break;
826 case EIO_WRITE: req->result = req->offs >= 0
827 ? pwrite (req->int1, req->ptr2, req->size, req->offs)
828 : write (req->int1, req->ptr2, req->size); break;
829
830 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
831 case EIO_SENDFILE: req->result = sendfile_ (req->int1, req->int2, req->offs, req->size, self); break;
832
833 case EIO_STAT: req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
834 case EIO_LSTAT: req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
835 case EIO_FSTAT: req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
836
837 case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
838 case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
839 case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
840 case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
841 case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
842 case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
843
844 case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
845 case EIO_CLOSE: req->result = close (req->int1); break;
846 case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
847 case EIO_UNLINK: req->result = unlink (req->ptr1); break;
848 case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
849 case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
850 case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
851 case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
852 case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
853 case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
854 case EIO_READLINK: req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
855
856 case EIO_SYNC: req->result = 0; sync (); break;
857 case EIO_FSYNC: req->result = fsync (req->int1); break;
858 case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
859
860 case EIO_READDIR: scandir_ (req, self); break;
861
862 case EIO_BUSY:
863 #ifdef _WIN32
864 Sleep (req->nv1 * 1000.);
865 #else
866 {
867 struct timeval tv;
868
869 tv.tv_sec = req->nv1;
870 tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
871
872 req->result = select (0, 0, 0, 0, &tv);
873 }
874 #endif
875 break;
876
877 case EIO_UTIME:
878 case EIO_FUTIME:
879 {
880 struct timeval tv[2];
881 struct timeval *times;
882
883 if (req->nv1 != -1. || req->nv2 != -1.)
884 {
885 tv[0].tv_sec = req->nv1;
886 tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
887 tv[1].tv_sec = req->nv2;
888 tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
889
890 times = tv;
891 }
892 else
893 times = 0;
894
895
896 req->result = req->type == EIO_FUTIME
897 ? futimes (req->int1, times)
898 : utimes (req->ptr1, times);
899 }
900
901 case EIO_GROUP:
902 case EIO_NOP:
903 break;
904
905 case EIO_QUIT:
906 goto quit;
907
908 default:
909 req->result = -1;
910 break;
911 }
912
913 req->errorno = errno;
914
915 X_LOCK (reslock);
916
917 ++npending;
918
919 if (!reqq_push (&res_queue, req) && want_poll_cb)
920 want_poll_cb ();
921
922 self->req = 0;
923 worker_clear (self);
924
925 X_UNLOCK (reslock);
926 }
927
928 quit:
929 X_LOCK (wrklock);
930 worker_free (self);
931 X_UNLOCK (wrklock);
932
933 return 0;
934 }
935
936 /*****************************************************************************/
937
938 static void eio_atfork_prepare (void)
939 {
940 X_LOCK (wrklock);
941 X_LOCK (reqlock);
942 X_LOCK (reslock);
943 #if !HAVE_PREADWRITE
944 X_LOCK (preadwritelock);
945 #endif
946 #if !HAVE_READDIR_R
947 X_LOCK (readdirlock);
948 #endif
949 }
950
951 static void eio_atfork_parent (void)
952 {
953 #if !HAVE_READDIR_R
954 X_UNLOCK (readdirlock);
955 #endif
956 #if !HAVE_PREADWRITE
957 X_UNLOCK (preadwritelock);
958 #endif
959 X_UNLOCK (reslock);
960 X_UNLOCK (reqlock);
961 X_UNLOCK (wrklock);
962 }
963
964 static void eio_atfork_child (void)
965 {
966 eio_req *prv;
967
968 while (prv = reqq_shift (&req_queue))
969 eio_destroy (prv);
970
971 while (prv = reqq_shift (&res_queue))
972 eio_destroy (prv);
973
974 while (wrk_first.next != &wrk_first)
975 {
976 worker *wrk = wrk_first.next;
977
978 if (wrk->req)
979 eio_destroy (wrk->req);
980
981 worker_clear (wrk);
982 worker_free (wrk);
983 }
984
985 started = 0;
986 idle = 0;
987 nreqs = 0;
988 nready = 0;
989 npending = 0;
990
991 eio_atfork_parent ();
992 }
993
994 int eio_init (void (*want_poll)(void), void (*done_poll)(void))
995 {
996 want_poll_cb = want_poll;
997 done_poll_cb = done_poll;
998
999 #ifdef _WIN32
1000 X_MUTEX_CHECK (wrklock);
1001 X_MUTEX_CHECK (reslock);
1002 X_MUTEX_CHECK (reqlock);
1003 X_MUTEX_CHECK (reqwait);
1004 X_MUTEX_CHECK (preadwritelock);
1005 X_MUTEX_CHECK (readdirlock);
1006
1007 X_COND_CHECK (reqwait);
1008 #endif
1009
1010 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1011 }
1012
1013 #if 0
1014
1015 eio_req *eio_fsync (int fd, eio_cb cb);
1016 eio_req *eio_fdatasync (int fd, eio_cb cb);
1017 eio_req *eio_dupclose (int fd, eio_cb cb);
1018 eio_req *eio_readahead (int fd, off_t offset, size_t length, eio_cb cb);
1019 eio_req *eio_read (int fd, off_t offs, size_t length, char *data, eio_cb cb);
1020 eio_req *eio_write (int fd, off_t offs, size_t length, char *data, eio_cb cb);
1021 eio_req *eio_fstat (int fd, eio_cb cb); /* stat buffer=ptr2 allocates dynamically */
1022 eio_req *eio_futime (int fd, double atime, double mtime, eio_cb cb);
1023 eio_req *eio_ftruncate (int fd, off_t offset, eio_cb cb);
1024 eio_req *eio_fchmod (int fd, mode_t mode, eio_cb cb);
1025 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, eio_cb cb);
1026 eio_req *eio_dup2 (int fd, int fd2, eio_cb cb);
1027 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, eio_cb cb);
1028 eio_req *eio_open (const char *path, int flags, mode_t mode, eio_cb cb);
1029 eio_req *eio_readlink (const char *path, eio_cb cb); /* result=ptr2 allocated dynamically */
1030 eio_req *eio_stat (const char *path, eio_cb cb); /* stat buffer=ptr2 allocates dynamically */
1031 eio_req *eio_lstat (const char *path, eio_cb cb); /* stat buffer=ptr2 allocates dynamically */
1032 eio_req *eio_utime (const char *path, double atime, double mtime, eio_cb cb);
1033 eio_req *eio_truncate (const char *path, off_t offset, eio_cb cb);
1034 eio_req *eio_chmod (const char *path, mode_t mode, eio_cb cb);
1035 eio_req *eio_mkdir (const char *path, mode_t mode, eio_cb cb);
1036 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, eio_cb cb);
1037 eio_req *eio_unlink (const char *path, eio_cb cb);
1038 eio_req *eio_rmdir (const char *path, eio_cb cb);
1039 eio_req *eio_readdir (const char *path, eio_cb cb); /* result=ptr2 allocated dynamically */
1040 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, eio_cb cb);
1041 eio_req *eio_busy (double delay, eio_cb cb); /* ties a thread for this long, simulating busyness */
1042 eio_req *eio_nop (eio_cb cb); /* does nothing except go through the whole process */
1043 void
1044 aio_open (SV8 *pathname, int flags, int mode, SV *callback=&PL_sv_undef)
1045 PROTOTYPE: $$$;$
1046 PPCODE:
1047 {
1048 dREQ;
1049
1050 req->type = EIO_OPEN;
1051 req->sv1 = newSVsv (pathname);
1052 req->ptr1 = SvPVbyte_nolen (req->sv1);
1053 req->int1 = flags;
1054 req->int2 = mode;
1055
1056 EIO_SEND;
1057 }
1058
1059 void
1060 aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1061 PROTOTYPE: $;$
1062 ALIAS:
1063 aio_fsync = EIO_FSYNC
1064 aio_fdatasync = EIO_FDATASYNC
1065 PPCODE:
1066 {
1067 dREQ;
1068
1069 req->type = ix;
1070 req->sv1 = newSVsv (fh);
1071 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1072
1073 EIO_SEND (req);
1074 }
1075
1076 void
1077 aio_close (SV *fh, SV *callback=&PL_sv_undef)
1078 PROTOTYPE: $;$
1079 PPCODE:
1080 {
1081 dREQ;
1082
1083 req->type = EIO_CLOSE;
1084 req->sv1 = newSVsv (fh);
1085 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1086
1087 EIO_SEND (req);
1088 }
1089
1090 void
1091 aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef)
1092 ALIAS:
1093 aio_read = EIO_READ
1094 aio_write = EIO_WRITE
1095 PROTOTYPE: $$$$$;$
1096 PPCODE:
1097 {
1098 STRLEN svlen;
1099 char *svptr = SvPVbyte (data, svlen);
1100 UV len = SvUV (length);
1101
1102 SvUPGRADE (data, SVt_PV);
1103 SvPOK_on (data);
1104
1105 if (dataoffset < 0)
1106 dataoffset += svlen;
1107
1108 if (dataoffset < 0 || dataoffset > svlen)
1109 croak ("dataoffset outside of data scalar");
1110
1111 if (ix == EIO_WRITE)
1112 {
1113 /* write: check length and adjust. */
1114 if (!SvOK (length) || len + dataoffset > svlen)
1115 len = svlen - dataoffset;
1116 }
1117 else
1118 {
1119 /* read: grow scalar as necessary */
1120 svptr = SvGROW (data, len + dataoffset + 1);
1121 }
1122
1123 if (len < 0)
1124 croak ("length must not be negative");
1125
1126 {
1127 dREQ;
1128
1129 req->type = ix;
1130 req->sv1 = newSVsv (fh);
1131 req->int1 = PerlIO_fileno (ix == EIO_READ ? IoIFP (sv_2io (fh))
1132 : IoOFP (sv_2io (fh)));
1133 req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1134 req->size = len;
1135 req->sv2 = SvREFCNT_inc (data);
1136 req->ptr2 = (char *)svptr + dataoffset;
1137 req->stroffset = dataoffset;
1138
1139 if (!SvREADONLY (data))
1140 {
1141 SvREADONLY_on (data);
1142 req->flags |= FLAG_SV2_RO_OFF;
1143 }
1144
1145 EIO_SEND;
1146 }
1147 }
1148
1149 void
1150 aio_readlink (SV8 *path, SV *callback=&PL_sv_undef)
1151 PROTOTYPE: $$;$
1152 PPCODE:
1153 {
1154 SV *data;
1155 dREQ;
1156
1157 data = newSV (NAME_MAX);
1158 SvPOK_on (data);
1159
1160 req->type = EIO_READLINK;
1161 req->sv1 = newSVsv (path);
1162 req->ptr1 = SvPVbyte_nolen (req->sv1);
1163 req->sv2 = data;
1164 req->ptr2 = SvPVbyte_nolen (data);
1165
1166 EIO_SEND;
1167 }
1168
1169 void
1170 aio_sendfile (SV *out_fh, SV *in_fh, SV *in_offset, UV length, SV *callback=&PL_sv_undef)
1171 PROTOTYPE: $$$$;$
1172 PPCODE:
1173 {
1174 dREQ;
1175
1176 req->type = EIO_SENDFILE;
1177 req->sv1 = newSVsv (out_fh);
1178 req->int1 = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1179 req->sv2 = newSVsv (in_fh);
1180 req->int2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1181 req->offs = SvVAL64 (in_offset);
1182 req->size = length;
1183
1184 EIO_SEND;
1185 }
1186
1187 void
1188 aio_readahead (SV *fh, SV *offset, IV length, SV *callback=&PL_sv_undef)
1189 PROTOTYPE: $$$;$
1190 PPCODE:
1191 {
1192 dREQ;
1193
1194 req->type = EIO_READAHEAD;
1195 req->sv1 = newSVsv (fh);
1196 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1197 req->offs = SvVAL64 (offset);
1198 req->size = length;
1199
1200 EIO_SEND;
1201 }
1202
1203 void
1204 aio_stat (SV8 *fh_or_path, SV *callback=&PL_sv_undef)
1205 ALIAS:
1206 aio_stat = EIO_STAT
1207 aio_lstat = EIO_LSTAT
1208 PPCODE:
1209 {
1210 dREQ;
1211
1212 req->ptr2 = malloc (sizeof (EIO_STRUCT_STAT));
1213 if (!req->ptr2)
1214 {
1215 req_destroy (req);
1216 croak ("out of memory during aio_stat statdata allocation");
1217 }
1218
1219 req->flags |= FLAG_PTR2_FREE;
1220 req->sv1 = newSVsv (fh_or_path);
1221
1222 if (SvPOK (fh_or_path))
1223 {
1224 req->type = ix;
1225 req->ptr1 = SvPVbyte_nolen (req->sv1);
1226 }
1227 else
1228 {
1229 req->type = EIO_FSTAT;
1230 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1231 }
1232
1233 EIO_SEND;
1234 }
1235
1236 void
1237 aio_utime (SV8 *fh_or_path, SV *atime, SV *mtime, SV *callback=&PL_sv_undef)
1238 PPCODE:
1239 {
1240 dREQ;
1241
1242 req->nv1 = SvOK (atime) ? SvNV (atime) : -1.;
1243 req->nv2 = SvOK (mtime) ? SvNV (mtime) : -1.;
1244 req->sv1 = newSVsv (fh_or_path);
1245
1246 if (SvPOK (fh_or_path))
1247 {
1248 req->type = EIO_UTIME;
1249 req->ptr1 = SvPVbyte_nolen (req->sv1);
1250 }
1251 else
1252 {
1253 req->type = EIO_FUTIME;
1254 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1255 }
1256
1257 EIO_SEND;
1258 }
1259
1260 void
1261 aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef)
1262 PPCODE:
1263 {
1264 dREQ;
1265
1266 req->sv1 = newSVsv (fh_or_path);
1267 req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1268
1269 if (SvPOK (fh_or_path))
1270 {
1271 req->type = EIO_TRUNCATE;
1272 req->ptr1 = SvPVbyte_nolen (req->sv1);
1273 }
1274 else
1275 {
1276 req->type = EIO_FTRUNCATE;
1277 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1278 }
1279
1280 EIO_SEND;
1281 }
1282
1283 void
1284 aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef)
1285 ALIAS:
1286 aio_chmod = EIO_CHMOD
1287 aio_fchmod = EIO_FCHMOD
1288 aio_mkdir = EIO_MKDIR
1289 PPCODE:
1290 {
1291 dREQ;
1292
1293 req->type = type;
1294 req->int2 = mode;
1295 req->sv1 = newSVsv (fh_or_path);
1296
1297 if (ix == EIO_FCHMOD)
1298 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1299 else
1300 req->ptr1 = SvPVbyte_nolen (req->sv1);
1301
1302 EIO_SEND;
1303 }
1304
1305 void
1306 aio_chown (SV8 *fh_or_path, SV *uid, SV *gid, SV *callback=&PL_sv_undef)
1307 PPCODE:
1308 {
1309 dREQ;
1310
1311 req->int2 = SvOK (uid) ? SvIV (uid) : -1;
1312 req->int3 = SvOK (gid) ? SvIV (gid) : -1;
1313 req->sv1 = newSVsv (fh_or_path);
1314
1315 if (SvPOK (fh_or_path))
1316 {
1317 req->type = EIO_CHOWN;
1318 req->ptr1 = SvPVbyte_nolen (req->sv1);
1319 }
1320 else
1321 {
1322 req->type = EIO_FCHOWN;
1323 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1324 }
1325
1326 EIO_SEND;
1327 }
1328
1329 void
1330 aio_unlink (SV8 *pathname, SV *callback=&PL_sv_undef)
1331 ALIAS:
1332 aio_unlink = EIO_UNLINK
1333 aio_rmdir = EIO_RMDIR
1334 aio_readdir = EIO_READDIR
1335 PPCODE:
1336 {
1337 dREQ;
1338
1339 req->type = ix;
1340 req->sv1 = newSVsv (pathname);
1341 req->ptr1 = SvPVbyte_nolen (req->sv1);
1342
1343 EIO_SEND;
1344 }
1345
1346 void
1347 aio_link (SV8 *oldpath, SV8 *newpath, SV *callback=&PL_sv_undef)
1348 ALIAS:
1349 aio_link = EIO_LINK
1350 aio_symlink = EIO_SYMLINK
1351 aio_rename = EIO_RENAME
1352 PPCODE:
1353 {
1354 dREQ;
1355
1356 req->type = ix;
1357 req->sv1 = newSVsv (oldpath);
1358 req->ptr1 = SvPVbyte_nolen (req->sv1);
1359 req->sv2 = newSVsv (newpath);
1360 req->ptr2 = SvPVbyte_nolen (req->sv2);
1361
1362 EIO_SEND;
1363 }
1364
1365 void
1366 aio_mknod (SV8 *pathname, int mode, UV dev, SV *callback=&PL_sv_undef)
1367 PPCODE:
1368 {
1369 dREQ;
1370
1371 req->type = EIO_MKNOD;
1372 req->sv1 = newSVsv (pathname);
1373 req->ptr1 = SvPVbyte_nolen (req->sv1);
1374 req->int2 = (mode_t)mode;
1375 req->offs = dev;
1376
1377 EIO_SEND;
1378 }
1379
1380 void
1381 aio_busy (double delay, SV *callback=&PL_sv_undef)
1382 PPCODE:
1383 {
1384 dREQ;
1385
1386 req->type = EIO_BUSY;
1387 req->nv1 = delay < 0. ? 0. : delay;
1388
1389 EIO_SEND;
1390 }
1391
1392 void
1393 aio_group (SV *callback=&PL_sv_undef)
1394 PROTOTYPE: ;$
1395 PPCODE:
1396 {
1397 dREQ;
1398
1399 req->type = EIO_GROUP;
1400
1401 req_send (req);
1402 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1403 }
1404
1405 void
1406 aio_nop (SV *callback=&PL_sv_undef)
1407 ALIAS:
1408 aio_nop = EIO_NOP
1409 aio_sync = EIO_SYNC
1410 PPCODE:
1411 {
1412 dREQ;
1413
1414 #endif
1415
1416 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1417 {
1418 grp->int2 = limit;
1419 grp->feed = feed;
1420
1421 grp_try_feed (grp);
1422 }
1423
1424 void eio_grp_limit (eio_req *grp, int limit)
1425 {
1426 grp->int2 = limit;
1427
1428 grp_try_feed (grp);
1429 }
1430
1431 void eio_grp_add (eio_req *grp, eio_req *req)
1432 {
1433 assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1434
1435 ++grp->size;
1436 req->grp = grp;
1437
1438 req->grp_prev = 0;
1439 req->grp_next = grp->grp_first;
1440
1441 if (grp->grp_first)
1442 grp->grp_first->grp_prev = req;
1443
1444 grp->grp_first = req;
1445 }
1446
1447