ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.1
Committed: Sat May 10 17:16:39 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Log Message:
initial check-in

File Contents

# Content
1 #include "eio.h"
2 #include "xthread.h"
3
4 #include <errno.h>
5
6 #include "EXTERN.h"
7 #include "perl.h"
8 #include "XSUB.h"
9
10 #include <stddef.h>
11 #include <stdlib.h>
12 #include <errno.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <limits.h>
16 #include <fcntl.h>
17 #include <sched.h>
18
19 #ifndef EIO_FINISH
20 # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
21 #endif
22
23 #ifndef EIO_DESTROY
24 # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
25 #endif
26
27 #ifndef EIO_FEED
28 # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
29 #endif
30
31 #ifdef _WIN32
32
33 /*doh*/
34
35 #else
36
37 # include "config.h"
38 # include <sys/time.h>
39 # include <sys/select.h>
40 # include <unistd.h>
41 # include <utime.h>
42 # include <signal.h>
43
44 # ifndef EIO_STRUCT_DIRENT
45 # define EIO_STRUCT_DIRENT struct dirent
46 # endif
47
48 #endif
49
50 # ifndef EIO_STRUCT_STAT
51 # define EIO_STRUCT_STAT struct stat
52 # endif
53
54 #if HAVE_SENDFILE
55 # if __linux
56 # include <sys/sendfile.h>
57 # elif __freebsd
58 # include <sys/socket.h>
59 # include <sys/uio.h>
60 # elif __hpux
61 # include <sys/socket.h>
62 # elif __solaris /* not yet */
63 # include <sys/sendfile.h>
64 # else
65 # error sendfile support requested but not available
66 # endif
67 #endif
68
69 /* number of seconds after which an idle threads exit */
70 #define IDLE_TIMEOUT 10
71
72 /* used for struct dirent, AIX doesn't provide it */
73 #ifndef NAME_MAX
74 # define NAME_MAX 4096
75 #endif
76
77 /* buffer size for various temporary buffers */
78 #define EIO_BUFSIZE 65536
79
80 #define dBUF \
81 char *eio_buf; \
82 X_LOCK (wrklock); \
83 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
84 X_UNLOCK (wrklock); \
85 if (!eio_buf) \
86 return -1;
87
88 #define EIO_TICKS ((1000000 + 1023) >> 10)
89
90 static void (*want_poll_cb) (void);
91 static void (*done_poll_cb) (void);
92
93 static unsigned int max_poll_time = 0;
94 static unsigned int max_poll_reqs = 0;
95
96 /* calculcate time difference in ~1/EIO_TICKS of a second */
97 static int tvdiff (struct timeval *tv1, struct timeval *tv2)
98 {
99 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
100 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
101 }
102
103 static unsigned int started, idle, wanted;
104
105 /* worker threads management */
106 static mutex_t wrklock = X_MUTEX_INIT;
107
108 typedef struct worker {
109 /* locked by wrklock */
110 struct worker *prev, *next;
111
112 thread_t tid;
113
114 /* locked by reslock, reqlock or wrklock */
115 eio_req *req; /* currently processed request */
116 void *dbuf;
117 DIR *dirp;
118 } worker;
119
120 static worker wrk_first = { &wrk_first, &wrk_first, 0 };
121
122 static void worker_clear (worker *wrk)
123 {
124 if (wrk->dirp)
125 {
126 closedir (wrk->dirp);
127 wrk->dirp = 0;
128 }
129
130 if (wrk->dbuf)
131 {
132 free (wrk->dbuf);
133 wrk->dbuf = 0;
134 }
135 }
136
137 static void worker_free (worker *wrk)
138 {
139 wrk->next->prev = wrk->prev;
140 wrk->prev->next = wrk->next;
141
142 free (wrk);
143 }
144
145 static volatile unsigned int nreqs, nready, npending;
146 static volatile unsigned int max_idle = 4;
147
148 static mutex_t reslock = X_MUTEX_INIT;
149 static mutex_t reqlock = X_MUTEX_INIT;
150 static cond_t reqwait = X_COND_INIT;
151
152 unsigned int eio_nreqs (void)
153 {
154 return nreqs;
155 }
156
157 unsigned int eio_nready (void)
158 {
159 unsigned int retval;
160
161 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
162 retval = nready;
163 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
164
165 return retval;
166 }
167
168 unsigned int eio_npending (void)
169 {
170 unsigned int retval;
171
172 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
173 retval = npending;
174 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
175
176 return retval;
177 }
178
179 unsigned int eio_nthreads (void)
180 {
181 unsigned int retval;
182
183 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
184 retval = started;
185 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
186
187 return retval;
188 }
189
190 /*
191 * a somewhat faster data structure might be nice, but
192 * with 8 priorities this actually needs <20 insns
193 * per shift, the most expensive operation.
194 */
195 typedef struct {
196 eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
197 int size;
198 } reqq;
199
200 static reqq req_queue;
201 static reqq res_queue;
202
203 static int reqq_push (reqq *q, eio_req *req)
204 {
205 int pri = req->pri;
206 req->next = 0;
207
208 if (q->qe[pri])
209 {
210 q->qe[pri]->next = req;
211 q->qe[pri] = req;
212 }
213 else
214 q->qe[pri] = q->qs[pri] = req;
215
216 return q->size++;
217 }
218
219 static eio_req *reqq_shift (reqq *q)
220 {
221 int pri;
222
223 if (!q->size)
224 return 0;
225
226 --q->size;
227
228 for (pri = EIO_NUM_PRI; pri--; )
229 {
230 eio_req *req = q->qs[pri];
231
232 if (req)
233 {
234 if (!(q->qs[pri] = (eio_req *)req->next))
235 q->qe[pri] = 0;
236
237 return req;
238 }
239 }
240
241 abort ();
242 }
243
244 static void grp_feed (eio_req *grp)
245 {
246 while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
247 {
248 int old_len = grp->size;
249
250 EIO_FEED (grp);
251
252 /* stop if no progress has been made */
253 if (old_len == grp->size)
254 {
255 grp->feed = 0;
256 grp->int2 = 0;
257 }
258 }
259 }
260
261 static int eio_invoke (eio_req *req);
262
263 static int grp_dec (eio_req *grp)
264 {
265 --grp->size;
266
267 /* call feeder, if applicable */
268 grp_feed (grp);
269
270 /* finish, if done */
271 if (!grp->size && grp->int1)
272 return eio_invoke (grp);
273 else
274 return 0;
275 }
276
277 void eio_destroy (eio_req *req)
278 {
279 if ((req)->flags & EIO_FLAG_PTR2_FREE)
280 free (req->ptr2);
281
282 EIO_DESTROY (req);
283 }
284
285 static int eio_invoke (eio_req *req)
286 {
287 int res = EIO_FINISH (req);
288
289 if (req->grp)
290 {
291 int res2;
292 eio_req *grp = req->grp;
293
294 /* unlink request */
295 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
296 if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
297
298 if (grp->grp_first == req)
299 grp->grp_first = req->grp_next;
300
301 res2 = grp_dec (grp);
302
303 if (!res && res2)
304 res = res2;
305 }
306
307 eio_destroy (req);
308
309 return res;
310 }
311
312 void eio_grp_cancel (eio_req *grp)
313 {
314 for (grp = grp->grp_first; grp; grp = grp->grp_next)
315 eio_cancel (grp);
316 }
317
318 void eio_cancel (eio_req *req)
319 {
320 req->flags |= EIO_FLAG_CANCELLED;
321
322 eio_grp_cancel (req);
323 }
324
325 X_THREAD_PROC (eio_proc);
326
327 static void start_thread (void)
328 {
329 worker *wrk = calloc (1, sizeof (worker));
330
331 if (!wrk)
332 croak ("unable to allocate worker thread data");
333
334 X_LOCK (wrklock);
335
336 if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
337 {
338 wrk->prev = &wrk_first;
339 wrk->next = wrk_first.next;
340 wrk_first.next->prev = wrk;
341 wrk_first.next = wrk;
342 ++started;
343 }
344 else
345 free (wrk);
346
347 X_UNLOCK (wrklock);
348 }
349
350 static void maybe_start_thread (void)
351 {
352 if (eio_nthreads () >= wanted)
353 return;
354
355 /* todo: maybe use idle here, but might be less exact */
356 if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
357 return;
358
359 start_thread ();
360 }
361
362 void eio_submit (eio_req *req)
363 {
364 ++nreqs;
365
366 X_LOCK (reqlock);
367 ++nready;
368 reqq_push (&req_queue, req);
369 X_COND_SIGNAL (reqwait);
370 X_UNLOCK (reqlock);
371
372 maybe_start_thread ();
373 }
374
375 static void end_thread (void)
376 {
377 eio_req *req = calloc (1, sizeof (eio_req));
378
379 req->type = EIO_QUIT;
380 req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
381
382 X_LOCK (reqlock);
383 reqq_push (&req_queue, req);
384 X_COND_SIGNAL (reqwait);
385 X_UNLOCK (reqlock);
386
387 X_LOCK (wrklock);
388 --started;
389 X_UNLOCK (wrklock);
390 }
391
392 void eio_set_max_poll_time (double nseconds)
393 {
394 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
395 max_poll_time = nseconds;
396 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
397 }
398
399 void eio_set_max_poll_reqs (unsigned int maxreqs)
400 {
401 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
402 max_poll_reqs = maxreqs;
403 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
404 }
405
406 void eio_set_max_idle (unsigned int nthreads)
407 {
408 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
409 max_idle = nthreads <= 0 ? 1 : nthreads;
410 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
411 }
412
413 void eio_set_min_parallel (unsigned int nthreads)
414 {
415 if (wanted < nthreads)
416 wanted = nthreads;
417 }
418
419 void eio_set_max_parallel (unsigned int nthreads)
420 {
421 if (wanted > nthreads)
422 wanted = nthreads;
423
424 while (started > wanted)
425 end_thread ();
426 }
427
428 int eio_poll (void)
429 {
430 int maxreqs = max_poll_reqs;
431 struct timeval tv_start, tv_now;
432 eio_req *req;
433
434 if (max_poll_time)
435 gettimeofday (&tv_start, 0);
436
437 for (;;)
438 {
439 maybe_start_thread ();
440
441 X_LOCK (reslock);
442 req = reqq_shift (&res_queue);
443
444 if (req)
445 {
446 --npending;
447
448 if (!res_queue.size)
449 done_poll_cb ();
450 }
451
452 X_UNLOCK (reslock);
453
454 if (!req)
455 return 0;
456
457 --nreqs;
458
459 if (req->type == EIO_GROUP && req->size)
460 {
461 req->int1 = 1; /* mark request as delayed */
462 continue;
463 }
464 else
465 {
466 int res = eio_invoke (req);
467 if (res)
468 return res;
469 }
470
471 if (maxreqs && !--maxreqs)
472 break;
473
474 if (max_poll_time)
475 {
476 gettimeofday (&tv_now, 0);
477
478 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
479 break;
480 }
481 }
482
483 errno = EAGAIN;
484 return -1;
485 }
486
487 /*****************************************************************************/
488 /* work around various missing functions */
489
490 #if !HAVE_PREADWRITE
491 # define pread aio_pread
492 # define pwrite aio_pwrite
493
494 /*
495 * make our pread/pwrite safe against themselves, but not against
496 * normal read/write by using a mutex. slows down execution a lot,
497 * but that's your problem, not mine.
498 */
499 static mutex_t preadwritelock = X_MUTEX_INIT;
500
501 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
502 {
503 ssize_t res;
504 off_t ooffset;
505
506 X_LOCK (preadwritelock);
507 ooffset = lseek (fd, 0, SEEK_CUR);
508 lseek (fd, offset, SEEK_SET);
509 res = read (fd, buf, count);
510 lseek (fd, ooffset, SEEK_SET);
511 X_UNLOCK (preadwritelock);
512
513 return res;
514 }
515
516 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
517 {
518 ssize_t res;
519 off_t ooffset;
520
521 X_LOCK (preadwritelock);
522 ooffset = lseek (fd, 0, SEEK_CUR);
523 lseek (fd, offset, SEEK_SET);
524 res = write (fd, buf, count);
525 lseek (fd, offset, SEEK_SET);
526 X_UNLOCK (preadwritelock);
527
528 return res;
529 }
530 #endif
531
532 #ifndef HAVE_FUTIMES
533
534 # define utimes(path,times) aio_utimes (path, times)
535 # define futimes(fd,times) aio_futimes (fd, times)
536
537 static int aio_utimes (const char *filename, const struct timeval times[2])
538 {
539 if (times)
540 {
541 struct utimbuf buf;
542
543 buf.actime = times[0].tv_sec;
544 buf.modtime = times[1].tv_sec;
545
546 return utime (filename, &buf);
547 }
548 else
549 return utime (filename, 0);
550 }
551
552 static int aio_futimes (int fd, const struct timeval tv[2])
553 {
554 errno = ENOSYS;
555 return -1;
556 }
557
558 #endif
559
560 #if !HAVE_FDATASYNC
561 # define fdatasync fsync
562 #endif
563
564 #if !HAVE_READAHEAD
565 # define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
566
567 static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
568 {
569 size_t todo = count;
570 dBUF;
571
572 while (todo > 0)
573 {
574 size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
575
576 pread (fd, aio_buf, len, offset);
577 offset += len;
578 todo -= len;
579 }
580
581 errno = 0;
582 return count;
583 }
584
585 #endif
586
587 #if !HAVE_READDIR_R
588 # define readdir_r aio_readdir_r
589
590 static mutex_t readdirlock = X_MUTEX_INIT;
591
592 static int readdir_r (DIR *dirp, X_DIRENT *ent, X_DIRENT **res)
593 {
594 X_DIRENT *e;
595 int errorno;
596
597 X_LOCK (readdirlock);
598
599 e = readdir (dirp);
600 errorno = errno;
601
602 if (e)
603 {
604 *res = ent;
605 strcpy (ent->d_name, e->d_name);
606 }
607 else
608 *res = 0;
609
610 X_UNLOCK (readdirlock);
611
612 errno = errorno;
613 return e ? 0 : -1;
614 }
615 #endif
616
617 /* sendfile always needs emulation */
618 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count, worker *self)
619 {
620 ssize_t res;
621
622 if (!count)
623 return 0;
624
625 #if HAVE_SENDFILE
626 # if __linux
627 res = sendfile (ofd, ifd, &offset, count);
628
629 # elif __freebsd
630 /*
631 * Of course, the freebsd sendfile is a dire hack with no thoughts
632 * wasted on making it similar to other I/O functions.
633 */
634 {
635 off_t sbytes;
636 res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
637
638 if (res < 0 && sbytes)
639 /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
640 res = sbytes;
641 }
642
643 # elif __hpux
644 res = sendfile (ofd, ifd, offset, count, 0, 0);
645
646 # elif __solaris
647 {
648 struct sendfilevec vec;
649 size_t sbytes;
650
651 vec.sfv_fd = ifd;
652 vec.sfv_flag = 0;
653 vec.sfv_off = offset;
654 vec.sfv_len = count;
655
656 res = sendfilev (ofd, &vec, 1, &sbytes);
657
658 if (res < 0 && sbytes)
659 res = sbytes;
660 }
661
662 # endif
663 #else
664 res = -1;
665 errno = ENOSYS;
666 #endif
667
668 if (res < 0
669 && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
670 #if __solaris
671 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
672 #endif
673 )
674 )
675 {
676 /* emulate sendfile. this is a major pain in the ass */
677 dBUF;
678
679 res = 0;
680
681 while (count)
682 {
683 ssize_t cnt;
684
685 cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
686
687 if (cnt <= 0)
688 {
689 if (cnt && !res) res = -1;
690 break;
691 }
692
693 cnt = write (ofd, eio_buf, cnt);
694
695 if (cnt <= 0)
696 {
697 if (cnt && !res) res = -1;
698 break;
699 }
700
701 offset += cnt;
702 res += cnt;
703 count -= cnt;
704 }
705 }
706
707 return res;
708 }
709
710 /* read a full directory */
711 static void scandir_ (eio_req *req, worker *self)
712 {
713 DIR *dirp;
714 union
715 {
716 EIO_STRUCT_DIRENT d;
717 char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
718 } *u;
719 EIO_STRUCT_DIRENT *entp;
720 char *name, *names;
721 int memlen = 4096;
722 int memofs = 0;
723 int res = 0;
724
725 X_LOCK (wrklock);
726 self->dirp = dirp = opendir (req->ptr1);
727 self->dbuf = u = malloc (sizeof (*u));
728 req->flags |= EIO_FLAG_PTR2_FREE;
729 req->ptr2 = names = malloc (memlen);
730 X_UNLOCK (wrklock);
731
732 if (dirp && u && names)
733 for (;;)
734 {
735 errno = 0;
736 readdir_r (dirp, &u->d, &entp);
737
738 if (!entp)
739 break;
740
741 name = entp->d_name;
742
743 if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
744 {
745 int len = strlen (name) + 1;
746
747 res++;
748
749 while (memofs + len > memlen)
750 {
751 memlen *= 2;
752 X_LOCK (wrklock);
753 req->ptr2 = names = realloc (names, memlen);
754 X_UNLOCK (wrklock);
755
756 if (!names)
757 break;
758 }
759
760 memcpy (names + memofs, name, len);
761 memofs += len;
762 }
763 }
764
765 if (errno)
766 res = -1;
767
768 req->result = res;
769 }
770
771 /*****************************************************************************/
772
773 X_THREAD_PROC (eio_proc)
774 {
775 eio_req *req;
776 struct timespec ts;
777 worker *self = (worker *)thr_arg;
778
779 /* try to distribute timeouts somewhat randomly */
780 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
781
782 for (;;)
783 {
784 ts.tv_sec = time (0) + IDLE_TIMEOUT;
785
786 X_LOCK (reqlock);
787
788 for (;;)
789 {
790 self->req = req = reqq_shift (&req_queue);
791
792 if (req)
793 break;
794
795 ++idle;
796
797 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
798 {
799 if (idle > max_idle)
800 {
801 --idle;
802 X_UNLOCK (reqlock);
803 X_LOCK (wrklock);
804 --started;
805 X_UNLOCK (wrklock);
806 goto quit;
807 }
808
809 /* we are allowed to idle, so do so without any timeout */
810 X_COND_WAIT (reqwait, reqlock);
811 ts.tv_sec = time (0) + IDLE_TIMEOUT;
812 }
813
814 --idle;
815 }
816
817 --nready;
818
819 X_UNLOCK (reqlock);
820
821 errno = 0; /* strictly unnecessary */
822
823 if (!EIO_CANCELLED (req))
824 switch (req->type)
825 {
826 case EIO_READ: req->result = req->offs >= 0
827 ? pread (req->int1, req->ptr2, req->size, req->offs)
828 : read (req->int1, req->ptr2, req->size); break;
829 case EIO_WRITE: req->result = req->offs >= 0
830 ? pwrite (req->int1, req->ptr2, req->size, req->offs)
831 : write (req->int1, req->ptr2, req->size); break;
832
833 case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
834 case EIO_SENDFILE: req->result = sendfile_ (req->int1, req->int2, req->offs, req->size, self); break;
835
836 case EIO_STAT: req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
837 case EIO_LSTAT: req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
838 case EIO_FSTAT: req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
839
840 case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
841 case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
842 case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
843 case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
844 case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
845 case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
846
847 case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
848 case EIO_CLOSE: req->result = close (req->int1); break;
849 case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
850 case EIO_UNLINK: req->result = unlink (req->ptr1); break;
851 case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
852 case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
853 case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
854 case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
855 case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
856 case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
857 case EIO_READLINK: req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
858
859 case EIO_SYNC: req->result = 0; sync (); break;
860 case EIO_FSYNC: req->result = fsync (req->int1); break;
861 case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
862
863 case EIO_READDIR: scandir_ (req, self); break;
864
865 case EIO_BUSY:
866 #ifdef _WIN32
867 Sleep (req->nv1 * 1000.);
868 #else
869 {
870 struct timeval tv;
871
872 tv.tv_sec = req->nv1;
873 tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
874
875 req->result = select (0, 0, 0, 0, &tv);
876 }
877 #endif
878 break;
879
880 case EIO_UTIME:
881 case EIO_FUTIME:
882 {
883 struct timeval tv[2];
884 struct timeval *times;
885
886 if (req->nv1 != -1. || req->nv2 != -1.)
887 {
888 tv[0].tv_sec = req->nv1;
889 tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
890 tv[1].tv_sec = req->nv2;
891 tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
892
893 times = tv;
894 }
895 else
896 times = 0;
897
898
899 req->result = req->type == EIO_FUTIME
900 ? futimes (req->int1, times)
901 : utimes (req->ptr1, times);
902 }
903
904 case EIO_GROUP:
905 case EIO_NOP:
906 break;
907
908 case EIO_QUIT:
909 goto quit;
910
911 default:
912 req->result = -1;
913 break;
914 }
915
916 req->errorno = errno;
917
918 X_LOCK (reslock);
919
920 ++npending;
921
922 if (!reqq_push (&res_queue, req))
923 want_poll_cb ();
924
925 self->req = 0;
926 worker_clear (self);
927
928 X_UNLOCK (reslock);
929 }
930
931 quit:
932 X_LOCK (wrklock);
933 worker_free (self);
934 X_UNLOCK (wrklock);
935
936 return 0;
937 }
938
939 /*****************************************************************************/
940
941 static void atfork_prepare (void)
942 {
943 X_LOCK (wrklock);
944 X_LOCK (reqlock);
945 X_LOCK (reslock);
946 #if !HAVE_PREADWRITE
947 X_LOCK (preadwritelock);
948 #endif
949 #if !HAVE_READDIR_R
950 X_LOCK (readdirlock);
951 #endif
952 }
953
954 static void atfork_parent (void)
955 {
956 #if !HAVE_READDIR_R
957 X_UNLOCK (readdirlock);
958 #endif
959 #if !HAVE_PREADWRITE
960 X_UNLOCK (preadwritelock);
961 #endif
962 X_UNLOCK (reslock);
963 X_UNLOCK (reqlock);
964 X_UNLOCK (wrklock);
965 }
966
967 static void atfork_child (void)
968 {
969 eio_req *prv;
970
971 while (prv = reqq_shift (&req_queue))
972 eio_destroy (prv);
973
974 while (prv = reqq_shift (&res_queue))
975 eio_destroy (prv);
976
977 while (wrk_first.next != &wrk_first)
978 {
979 worker *wrk = wrk_first.next;
980
981 if (wrk->req)
982 eio_destroy (wrk->req);
983
984 worker_clear (wrk);
985 worker_free (wrk);
986 }
987
988 started = 0;
989 idle = 0;
990 nreqs = 0;
991 nready = 0;
992 npending = 0;
993
994 atfork_parent ();
995 }
996
997 int eio_init (void (*want_poll)(void), void (*done_poll)(void))
998 {
999 want_poll_cb = want_poll;
1000 done_poll_cb = done_poll;
1001
1002 #ifdef _WIN32
1003 X_MUTEX_CHECK (wrklock);
1004 X_MUTEX_CHECK (reslock);
1005 X_MUTEX_CHECK (reqlock);
1006 X_MUTEX_CHECK (reqwait);
1007 X_MUTEX_CHECK (preadwritelock);
1008 X_MUTEX_CHECK (readdirlock);
1009
1010 X_COND_CHECK (reqwait);
1011 #endif
1012
1013 X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child);
1014 }
1015
1016 #if 0
1017
1018 eio_req *eio_fsync (int fd, eio_cb cb);
1019 eio_req *eio_fdatasync (int fd, eio_cb cb);
1020 eio_req *eio_dupclose (int fd, eio_cb cb);
1021 eio_req *eio_readahead (int fd, off_t offset, size_t length, eio_cb cb);
1022 eio_req *eio_read (int fd, off_t offs, size_t length, char *data, eio_cb cb);
1023 eio_req *eio_write (int fd, off_t offs, size_t length, char *data, eio_cb cb);
1024 eio_req *eio_fstat (int fd, eio_cb cb); /* stat buffer=ptr2 allocates dynamically */
1025 eio_req *eio_futime (int fd, double atime, double mtime, eio_cb cb);
1026 eio_req *eio_ftruncate (int fd, off_t offset, eio_cb cb);
1027 eio_req *eio_fchmod (int fd, mode_t mode, eio_cb cb);
1028 eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, eio_cb cb);
1029 eio_req *eio_dup2 (int fd, int fd2, eio_cb cb);
1030 eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, eio_cb cb);
1031 eio_req *eio_open (const char *path, int flags, mode_t mode, eio_cb cb);
1032 eio_req *eio_readlink (const char *path, eio_cb cb); /* result=ptr2 allocated dynamically */
1033 eio_req *eio_stat (const char *path, eio_cb cb); /* stat buffer=ptr2 allocates dynamically */
1034 eio_req *eio_lstat (const char *path, eio_cb cb); /* stat buffer=ptr2 allocates dynamically */
1035 eio_req *eio_utime (const char *path, double atime, double mtime, eio_cb cb);
1036 eio_req *eio_truncate (const char *path, off_t offset, eio_cb cb);
1037 eio_req *eio_chmod (const char *path, mode_t mode, eio_cb cb);
1038 eio_req *eio_mkdir (const char *path, mode_t mode, eio_cb cb);
1039 eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, eio_cb cb);
1040 eio_req *eio_unlink (const char *path, eio_cb cb);
1041 eio_req *eio_rmdir (const char *path, eio_cb cb);
1042 eio_req *eio_readdir (const char *path, eio_cb cb); /* result=ptr2 allocated dynamically */
1043 eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, eio_cb cb);
1044 eio_req *eio_busy (double delay, eio_cb cb); /* ties a thread for this long, simulating busyness */
1045 eio_req *eio_nop (eio_cb cb); /* does nothing except go through the whole process */
1046 void
1047 aio_open (SV8 *pathname, int flags, int mode, SV *callback=&PL_sv_undef)
1048 PROTOTYPE: $$$;$
1049 PPCODE:
1050 {
1051 dREQ;
1052
1053 req->type = EIO_OPEN;
1054 req->sv1 = newSVsv (pathname);
1055 req->ptr1 = SvPVbyte_nolen (req->sv1);
1056 req->int1 = flags;
1057 req->int2 = mode;
1058
1059 EIO_SEND;
1060 }
1061
1062 void
1063 aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1064 PROTOTYPE: $;$
1065 ALIAS:
1066 aio_fsync = EIO_FSYNC
1067 aio_fdatasync = EIO_FDATASYNC
1068 PPCODE:
1069 {
1070 dREQ;
1071
1072 req->type = ix;
1073 req->sv1 = newSVsv (fh);
1074 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1075
1076 EIO_SEND (req);
1077 }
1078
1079 void
1080 aio_close (SV *fh, SV *callback=&PL_sv_undef)
1081 PROTOTYPE: $;$
1082 PPCODE:
1083 {
1084 dREQ;
1085
1086 req->type = EIO_CLOSE;
1087 req->sv1 = newSVsv (fh);
1088 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1089
1090 EIO_SEND (req);
1091 }
1092
1093 void
1094 aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef)
1095 ALIAS:
1096 aio_read = EIO_READ
1097 aio_write = EIO_WRITE
1098 PROTOTYPE: $$$$$;$
1099 PPCODE:
1100 {
1101 STRLEN svlen;
1102 char *svptr = SvPVbyte (data, svlen);
1103 UV len = SvUV (length);
1104
1105 SvUPGRADE (data, SVt_PV);
1106 SvPOK_on (data);
1107
1108 if (dataoffset < 0)
1109 dataoffset += svlen;
1110
1111 if (dataoffset < 0 || dataoffset > svlen)
1112 croak ("dataoffset outside of data scalar");
1113
1114 if (ix == EIO_WRITE)
1115 {
1116 /* write: check length and adjust. */
1117 if (!SvOK (length) || len + dataoffset > svlen)
1118 len = svlen - dataoffset;
1119 }
1120 else
1121 {
1122 /* read: grow scalar as necessary */
1123 svptr = SvGROW (data, len + dataoffset + 1);
1124 }
1125
1126 if (len < 0)
1127 croak ("length must not be negative");
1128
1129 {
1130 dREQ;
1131
1132 req->type = ix;
1133 req->sv1 = newSVsv (fh);
1134 req->int1 = PerlIO_fileno (ix == EIO_READ ? IoIFP (sv_2io (fh))
1135 : IoOFP (sv_2io (fh)));
1136 req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1137 req->size = len;
1138 req->sv2 = SvREFCNT_inc (data);
1139 req->ptr2 = (char *)svptr + dataoffset;
1140 req->stroffset = dataoffset;
1141
1142 if (!SvREADONLY (data))
1143 {
1144 SvREADONLY_on (data);
1145 req->flags |= FLAG_SV2_RO_OFF;
1146 }
1147
1148 EIO_SEND;
1149 }
1150 }
1151
1152 void
1153 aio_readlink (SV8 *path, SV *callback=&PL_sv_undef)
1154 PROTOTYPE: $$;$
1155 PPCODE:
1156 {
1157 SV *data;
1158 dREQ;
1159
1160 data = newSV (NAME_MAX);
1161 SvPOK_on (data);
1162
1163 req->type = EIO_READLINK;
1164 req->sv1 = newSVsv (path);
1165 req->ptr1 = SvPVbyte_nolen (req->sv1);
1166 req->sv2 = data;
1167 req->ptr2 = SvPVbyte_nolen (data);
1168
1169 EIO_SEND;
1170 }
1171
1172 void
1173 aio_sendfile (SV *out_fh, SV *in_fh, SV *in_offset, UV length, SV *callback=&PL_sv_undef)
1174 PROTOTYPE: $$$$;$
1175 PPCODE:
1176 {
1177 dREQ;
1178
1179 req->type = EIO_SENDFILE;
1180 req->sv1 = newSVsv (out_fh);
1181 req->int1 = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1182 req->sv2 = newSVsv (in_fh);
1183 req->int2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1184 req->offs = SvVAL64 (in_offset);
1185 req->size = length;
1186
1187 EIO_SEND;
1188 }
1189
1190 void
1191 aio_readahead (SV *fh, SV *offset, IV length, SV *callback=&PL_sv_undef)
1192 PROTOTYPE: $$$;$
1193 PPCODE:
1194 {
1195 dREQ;
1196
1197 req->type = EIO_READAHEAD;
1198 req->sv1 = newSVsv (fh);
1199 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1200 req->offs = SvVAL64 (offset);
1201 req->size = length;
1202
1203 EIO_SEND;
1204 }
1205
1206 void
1207 aio_stat (SV8 *fh_or_path, SV *callback=&PL_sv_undef)
1208 ALIAS:
1209 aio_stat = EIO_STAT
1210 aio_lstat = EIO_LSTAT
1211 PPCODE:
1212 {
1213 dREQ;
1214
1215 req->ptr2 = malloc (sizeof (EIO_STRUCT_STAT));
1216 if (!req->ptr2)
1217 {
1218 req_destroy (req);
1219 croak ("out of memory during aio_stat statdata allocation");
1220 }
1221
1222 req->flags |= FLAG_PTR2_FREE;
1223 req->sv1 = newSVsv (fh_or_path);
1224
1225 if (SvPOK (fh_or_path))
1226 {
1227 req->type = ix;
1228 req->ptr1 = SvPVbyte_nolen (req->sv1);
1229 }
1230 else
1231 {
1232 req->type = EIO_FSTAT;
1233 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1234 }
1235
1236 EIO_SEND;
1237 }
1238
1239 void
1240 aio_utime (SV8 *fh_or_path, SV *atime, SV *mtime, SV *callback=&PL_sv_undef)
1241 PPCODE:
1242 {
1243 dREQ;
1244
1245 req->nv1 = SvOK (atime) ? SvNV (atime) : -1.;
1246 req->nv2 = SvOK (mtime) ? SvNV (mtime) : -1.;
1247 req->sv1 = newSVsv (fh_or_path);
1248
1249 if (SvPOK (fh_or_path))
1250 {
1251 req->type = EIO_UTIME;
1252 req->ptr1 = SvPVbyte_nolen (req->sv1);
1253 }
1254 else
1255 {
1256 req->type = EIO_FUTIME;
1257 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1258 }
1259
1260 EIO_SEND;
1261 }
1262
1263 void
1264 aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef)
1265 PPCODE:
1266 {
1267 dREQ;
1268
1269 req->sv1 = newSVsv (fh_or_path);
1270 req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1271
1272 if (SvPOK (fh_or_path))
1273 {
1274 req->type = EIO_TRUNCATE;
1275 req->ptr1 = SvPVbyte_nolen (req->sv1);
1276 }
1277 else
1278 {
1279 req->type = EIO_FTRUNCATE;
1280 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1281 }
1282
1283 EIO_SEND;
1284 }
1285
1286 void
1287 aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef)
1288 ALIAS:
1289 aio_chmod = EIO_CHMOD
1290 aio_fchmod = EIO_FCHMOD
1291 aio_mkdir = EIO_MKDIR
1292 PPCODE:
1293 {
1294 dREQ;
1295
1296 req->type = type;
1297 req->int2 = mode;
1298 req->sv1 = newSVsv (fh_or_path);
1299
1300 if (ix == EIO_FCHMOD)
1301 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1302 else
1303 req->ptr1 = SvPVbyte_nolen (req->sv1);
1304
1305 EIO_SEND;
1306 }
1307
1308 void
1309 aio_chown (SV8 *fh_or_path, SV *uid, SV *gid, SV *callback=&PL_sv_undef)
1310 PPCODE:
1311 {
1312 dREQ;
1313
1314 req->int2 = SvOK (uid) ? SvIV (uid) : -1;
1315 req->int3 = SvOK (gid) ? SvIV (gid) : -1;
1316 req->sv1 = newSVsv (fh_or_path);
1317
1318 if (SvPOK (fh_or_path))
1319 {
1320 req->type = EIO_CHOWN;
1321 req->ptr1 = SvPVbyte_nolen (req->sv1);
1322 }
1323 else
1324 {
1325 req->type = EIO_FCHOWN;
1326 req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1327 }
1328
1329 EIO_SEND;
1330 }
1331
1332 void
1333 aio_unlink (SV8 *pathname, SV *callback=&PL_sv_undef)
1334 ALIAS:
1335 aio_unlink = EIO_UNLINK
1336 aio_rmdir = EIO_RMDIR
1337 aio_readdir = EIO_READDIR
1338 PPCODE:
1339 {
1340 dREQ;
1341
1342 req->type = ix;
1343 req->sv1 = newSVsv (pathname);
1344 req->ptr1 = SvPVbyte_nolen (req->sv1);
1345
1346 EIO_SEND;
1347 }
1348
1349 void
1350 aio_link (SV8 *oldpath, SV8 *newpath, SV *callback=&PL_sv_undef)
1351 ALIAS:
1352 aio_link = EIO_LINK
1353 aio_symlink = EIO_SYMLINK
1354 aio_rename = EIO_RENAME
1355 PPCODE:
1356 {
1357 dREQ;
1358
1359 req->type = ix;
1360 req->sv1 = newSVsv (oldpath);
1361 req->ptr1 = SvPVbyte_nolen (req->sv1);
1362 req->sv2 = newSVsv (newpath);
1363 req->ptr2 = SvPVbyte_nolen (req->sv2);
1364
1365 EIO_SEND;
1366 }
1367
1368 void
1369 aio_mknod (SV8 *pathname, int mode, UV dev, SV *callback=&PL_sv_undef)
1370 PPCODE:
1371 {
1372 dREQ;
1373
1374 req->type = EIO_MKNOD;
1375 req->sv1 = newSVsv (pathname);
1376 req->ptr1 = SvPVbyte_nolen (req->sv1);
1377 req->int2 = (mode_t)mode;
1378 req->offs = dev;
1379
1380 EIO_SEND;
1381 }
1382
1383 void
1384 aio_busy (double delay, SV *callback=&PL_sv_undef)
1385 PPCODE:
1386 {
1387 dREQ;
1388
1389 req->type = EIO_BUSY;
1390 req->nv1 = delay < 0. ? 0. : delay;
1391
1392 EIO_SEND;
1393 }
1394
1395 void
1396 aio_group (SV *callback=&PL_sv_undef)
1397 PROTOTYPE: ;$
1398 PPCODE:
1399 {
1400 dREQ;
1401
1402 req->type = EIO_GROUP;
1403
1404 req_send (req);
1405 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1406 }
1407
1408 void
1409 aio_nop (SV *callback=&PL_sv_undef)
1410 ALIAS:
1411 aio_nop = EIO_NOP
1412 aio_sync = EIO_SYNC
1413 PPCODE:
1414 {
1415 dREQ;
1416
1417 #endif
1418
1419 void eio_grp_feed (eio_req *grp, int limit, void (*feed)(eio_req *req))
1420 {
1421 grp->int2 = limit;
1422 grp->feed = feed;
1423 }
1424
1425 void eio_grp_add (eio_req *grp, eio_req *req)
1426 {
1427 assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1428
1429 ++grp->size;
1430 req->grp = grp;
1431
1432 req->grp_prev = 0;
1433 req->grp_next = grp->grp_first;
1434
1435 if (grp->grp_first)
1436 grp->grp_first->grp_prev = req;
1437
1438 grp->grp_first = req;
1439 }
1440
1441