ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.8
Committed: Sun May 11 20:08:37 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.7: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "eio.h"
2     #include "xthread.h"
3    
4     #include <errno.h>
5     #include <stddef.h>
6     #include <stdlib.h>
7 root 1.3 #include <string.h>
8 root 1.1 #include <errno.h>
9     #include <sys/types.h>
10     #include <sys/stat.h>
11     #include <limits.h>
12     #include <fcntl.h>
13 root 1.8 #include <assert.h>
14 root 1.1
15     #ifndef EIO_FINISH
16     # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17     #endif
18    
19     #ifndef EIO_DESTROY
20     # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21     #endif
22    
23     #ifndef EIO_FEED
24     # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25     #endif
26    
27     #ifdef _WIN32
28    
29     /*doh*/
30    
31     #else
32    
33     # include "config.h"
34     # include <sys/time.h>
35     # include <sys/select.h>
36     # include <unistd.h>
37     # include <utime.h>
38     # include <signal.h>
39 root 1.4 # include <dirent.h>
40 root 1.1
41     # ifndef EIO_STRUCT_DIRENT
42     # define EIO_STRUCT_DIRENT struct dirent
43     # endif
44    
45     #endif
46    
47     # ifndef EIO_STRUCT_STAT
48     # define EIO_STRUCT_STAT struct stat
49     # endif
50    
51     #if HAVE_SENDFILE
52     # if __linux
53     # include <sys/sendfile.h>
54     # elif __freebsd
55     # include <sys/socket.h>
56     # include <sys/uio.h>
57     # elif __hpux
58     # include <sys/socket.h>
59     # elif __solaris /* not yet */
60     # include <sys/sendfile.h>
61     # else
62     # error sendfile support requested but not available
63     # endif
64     #endif
65    
66     /* number of seconds after which an idle threads exit */
67     #define IDLE_TIMEOUT 10
68    
69     /* used for struct dirent, AIX doesn't provide it */
70     #ifndef NAME_MAX
71     # define NAME_MAX 4096
72     #endif
73    
74     /* buffer size for various temporary buffers */
75     #define EIO_BUFSIZE 65536
76    
77     #define dBUF \
78     char *eio_buf; \
79     X_LOCK (wrklock); \
80     self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
81     X_UNLOCK (wrklock); \
82     if (!eio_buf) \
83     return -1;
84    
85     #define EIO_TICKS ((1000000 + 1023) >> 10)
86    
87     static void (*want_poll_cb) (void);
88     static void (*done_poll_cb) (void);
89    
90     static unsigned int max_poll_time = 0;
91     static unsigned int max_poll_reqs = 0;
92    
93     /* calculcate time difference in ~1/EIO_TICKS of a second */
94     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
95     {
96     return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
97     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
98     }
99    
100     static unsigned int started, idle, wanted;
101    
102     /* worker threads management */
103     static mutex_t wrklock = X_MUTEX_INIT;
104    
105     typedef struct worker {
106     /* locked by wrklock */
107     struct worker *prev, *next;
108    
109     thread_t tid;
110    
111     /* locked by reslock, reqlock or wrklock */
112     eio_req *req; /* currently processed request */
113     void *dbuf;
114     DIR *dirp;
115     } worker;
116    
117     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
118    
119     static void worker_clear (worker *wrk)
120     {
121     if (wrk->dirp)
122     {
123     closedir (wrk->dirp);
124     wrk->dirp = 0;
125     }
126    
127     if (wrk->dbuf)
128     {
129     free (wrk->dbuf);
130     wrk->dbuf = 0;
131     }
132     }
133    
134     static void worker_free (worker *wrk)
135     {
136     wrk->next->prev = wrk->prev;
137     wrk->prev->next = wrk->next;
138    
139     free (wrk);
140     }
141    
142     static volatile unsigned int nreqs, nready, npending;
143     static volatile unsigned int max_idle = 4;
144    
145     static mutex_t reslock = X_MUTEX_INIT;
146     static mutex_t reqlock = X_MUTEX_INIT;
147     static cond_t reqwait = X_COND_INIT;
148    
149     unsigned int eio_nreqs (void)
150     {
151     return nreqs;
152     }
153    
154     unsigned int eio_nready (void)
155     {
156     unsigned int retval;
157    
158     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159     retval = nready;
160     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
161    
162     return retval;
163     }
164    
165     unsigned int eio_npending (void)
166     {
167     unsigned int retval;
168    
169     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
170     retval = npending;
171     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
172    
173     return retval;
174     }
175    
176     unsigned int eio_nthreads (void)
177     {
178     unsigned int retval;
179    
180     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
181     retval = started;
182     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
183    
184     return retval;
185     }
186    
187     /*
188     * a somewhat faster data structure might be nice, but
189     * with 8 priorities this actually needs <20 insns
190     * per shift, the most expensive operation.
191     */
192     typedef struct {
193     eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
194     int size;
195     } reqq;
196    
197     static reqq req_queue;
198     static reqq res_queue;
199    
200     static int reqq_push (reqq *q, eio_req *req)
201     {
202     int pri = req->pri;
203     req->next = 0;
204    
205     if (q->qe[pri])
206     {
207     q->qe[pri]->next = req;
208     q->qe[pri] = req;
209     }
210     else
211     q->qe[pri] = q->qs[pri] = req;
212    
213     return q->size++;
214     }
215    
216     static eio_req *reqq_shift (reqq *q)
217     {
218     int pri;
219    
220     if (!q->size)
221     return 0;
222    
223     --q->size;
224    
225     for (pri = EIO_NUM_PRI; pri--; )
226     {
227     eio_req *req = q->qs[pri];
228    
229     if (req)
230     {
231     if (!(q->qs[pri] = (eio_req *)req->next))
232     q->qe[pri] = 0;
233    
234     return req;
235     }
236     }
237    
238     abort ();
239     }
240    
241 root 1.2 static void grp_try_feed (eio_req *grp)
242 root 1.1 {
243     while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
244     {
245     int old_len = grp->size;
246    
247     EIO_FEED (grp);
248    
249     /* stop if no progress has been made */
250     if (old_len == grp->size)
251     {
252     grp->feed = 0;
253 root 1.2 break;
254 root 1.1 }
255     }
256     }
257    
258 root 1.2 static int eio_finish (eio_req *req);
259 root 1.1
260     static int grp_dec (eio_req *grp)
261     {
262     --grp->size;
263    
264     /* call feeder, if applicable */
265 root 1.2 grp_try_feed (grp);
266 root 1.1
267     /* finish, if done */
268     if (!grp->size && grp->int1)
269 root 1.2 return eio_finish (grp);
270 root 1.1 else
271     return 0;
272     }
273    
274     void eio_destroy (eio_req *req)
275     {
276 root 1.6 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
277     if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
278 root 1.1
279     EIO_DESTROY (req);
280     }
281    
282 root 1.2 static int eio_finish (eio_req *req)
283 root 1.1 {
284     int res = EIO_FINISH (req);
285    
286     if (req->grp)
287     {
288     int res2;
289     eio_req *grp = req->grp;
290    
291     /* unlink request */
292     if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
293     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
294    
295     if (grp->grp_first == req)
296     grp->grp_first = req->grp_next;
297    
298     res2 = grp_dec (grp);
299    
300     if (!res && res2)
301     res = res2;
302     }
303    
304     eio_destroy (req);
305    
306     return res;
307     }
308    
309     void eio_grp_cancel (eio_req *grp)
310     {
311     for (grp = grp->grp_first; grp; grp = grp->grp_next)
312     eio_cancel (grp);
313     }
314    
315     void eio_cancel (eio_req *req)
316     {
317 root 1.6 X_LOCK (wrklock);
318 root 1.1 req->flags |= EIO_FLAG_CANCELLED;
319 root 1.6 X_UNLOCK (wrklock);
320 root 1.1
321     eio_grp_cancel (req);
322     }
323    
324     X_THREAD_PROC (eio_proc);
325    
326     static void start_thread (void)
327     {
328     worker *wrk = calloc (1, sizeof (worker));
329    
330 root 1.8 /*TODO*/
331     assert (("unable to allocate worker thread data", !wrk));
332 root 1.1
333     X_LOCK (wrklock);
334    
335     if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
336     {
337     wrk->prev = &wrk_first;
338     wrk->next = wrk_first.next;
339     wrk_first.next->prev = wrk;
340     wrk_first.next = wrk;
341     ++started;
342     }
343     else
344     free (wrk);
345    
346     X_UNLOCK (wrklock);
347     }
348    
349     static void maybe_start_thread (void)
350     {
351     if (eio_nthreads () >= wanted)
352     return;
353    
354     /* todo: maybe use idle here, but might be less exact */
355     if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
356     return;
357    
358     start_thread ();
359     }
360    
361     void eio_submit (eio_req *req)
362     {
363     ++nreqs;
364    
365     X_LOCK (reqlock);
366     ++nready;
367     reqq_push (&req_queue, req);
368     X_COND_SIGNAL (reqwait);
369     X_UNLOCK (reqlock);
370    
371     maybe_start_thread ();
372     }
373    
374     static void end_thread (void)
375     {
376     eio_req *req = calloc (1, sizeof (eio_req));
377    
378     req->type = EIO_QUIT;
379     req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
380    
381     X_LOCK (reqlock);
382     reqq_push (&req_queue, req);
383     X_COND_SIGNAL (reqwait);
384     X_UNLOCK (reqlock);
385    
386     X_LOCK (wrklock);
387     --started;
388     X_UNLOCK (wrklock);
389     }
390    
391     void eio_set_max_poll_time (double nseconds)
392     {
393     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
394     max_poll_time = nseconds;
395     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
396     }
397    
398     void eio_set_max_poll_reqs (unsigned int maxreqs)
399     {
400     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
401     max_poll_reqs = maxreqs;
402     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
403     }
404    
405     void eio_set_max_idle (unsigned int nthreads)
406     {
407     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
408     max_idle = nthreads <= 0 ? 1 : nthreads;
409     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
410     }
411    
412     void eio_set_min_parallel (unsigned int nthreads)
413     {
414     if (wanted < nthreads)
415     wanted = nthreads;
416     }
417    
418     void eio_set_max_parallel (unsigned int nthreads)
419     {
420     if (wanted > nthreads)
421     wanted = nthreads;
422    
423     while (started > wanted)
424     end_thread ();
425     }
426    
427     int eio_poll (void)
428     {
429     int maxreqs = max_poll_reqs;
430     struct timeval tv_start, tv_now;
431     eio_req *req;
432    
433     if (max_poll_time)
434     gettimeofday (&tv_start, 0);
435    
436     for (;;)
437     {
438     maybe_start_thread ();
439    
440     X_LOCK (reslock);
441     req = reqq_shift (&res_queue);
442    
443     if (req)
444     {
445     --npending;
446    
447 root 1.2 if (!res_queue.size && done_poll_cb)
448 root 1.1 done_poll_cb ();
449     }
450    
451     X_UNLOCK (reslock);
452    
453     if (!req)
454     return 0;
455    
456     --nreqs;
457    
458     if (req->type == EIO_GROUP && req->size)
459     {
460     req->int1 = 1; /* mark request as delayed */
461     continue;
462     }
463     else
464     {
465 root 1.2 int res = eio_finish (req);
466 root 1.1 if (res)
467     return res;
468     }
469    
470     if (maxreqs && !--maxreqs)
471     break;
472    
473     if (max_poll_time)
474     {
475     gettimeofday (&tv_now, 0);
476    
477     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
478     break;
479     }
480     }
481    
482     errno = EAGAIN;
483     return -1;
484     }
485    
486     /*****************************************************************************/
487     /* work around various missing functions */
488    
489     #if !HAVE_PREADWRITE
490     # define pread aio_pread
491     # define pwrite aio_pwrite
492    
493     /*
494     * make our pread/pwrite safe against themselves, but not against
495     * normal read/write by using a mutex. slows down execution a lot,
496     * but that's your problem, not mine.
497     */
498     static mutex_t preadwritelock = X_MUTEX_INIT;
499    
500     static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
501     {
502     ssize_t res;
503     off_t ooffset;
504    
505     X_LOCK (preadwritelock);
506     ooffset = lseek (fd, 0, SEEK_CUR);
507     lseek (fd, offset, SEEK_SET);
508     res = read (fd, buf, count);
509     lseek (fd, ooffset, SEEK_SET);
510     X_UNLOCK (preadwritelock);
511    
512     return res;
513     }
514    
515     static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
516     {
517     ssize_t res;
518     off_t ooffset;
519    
520     X_LOCK (preadwritelock);
521     ooffset = lseek (fd, 0, SEEK_CUR);
522     lseek (fd, offset, SEEK_SET);
523     res = write (fd, buf, count);
524     lseek (fd, offset, SEEK_SET);
525     X_UNLOCK (preadwritelock);
526    
527     return res;
528     }
529     #endif
530    
531     #ifndef HAVE_FUTIMES
532    
533     # define utimes(path,times) aio_utimes (path, times)
534     # define futimes(fd,times) aio_futimes (fd, times)
535    
536     static int aio_utimes (const char *filename, const struct timeval times[2])
537     {
538     if (times)
539     {
540     struct utimbuf buf;
541    
542     buf.actime = times[0].tv_sec;
543     buf.modtime = times[1].tv_sec;
544    
545     return utime (filename, &buf);
546     }
547     else
548     return utime (filename, 0);
549     }
550    
551     static int aio_futimes (int fd, const struct timeval tv[2])
552     {
553     errno = ENOSYS;
554     return -1;
555     }
556    
557     #endif
558    
559     #if !HAVE_FDATASYNC
560     # define fdatasync fsync
561     #endif
562    
563     #if !HAVE_READAHEAD
564     # define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
565    
566     static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
567     {
568     size_t todo = count;
569     dBUF;
570    
571     while (todo > 0)
572     {
573     size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
574    
575 root 1.3 pread (fd, eio_buf, len, offset);
576 root 1.1 offset += len;
577     todo -= len;
578     }
579    
580     errno = 0;
581     return count;
582     }
583    
584     #endif
585    
586     #if !HAVE_READDIR_R
587     # define readdir_r aio_readdir_r
588    
589     static mutex_t readdirlock = X_MUTEX_INIT;
590    
591 root 1.5 static int readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
592 root 1.1 {
593 root 1.5 EIO_STRUCT_DIRENT *e;
594 root 1.1 int errorno;
595    
596     X_LOCK (readdirlock);
597    
598     e = readdir (dirp);
599     errorno = errno;
600    
601     if (e)
602     {
603     *res = ent;
604     strcpy (ent->d_name, e->d_name);
605     }
606     else
607     *res = 0;
608    
609     X_UNLOCK (readdirlock);
610    
611     errno = errorno;
612     return e ? 0 : -1;
613     }
614     #endif
615    
616     /* sendfile always needs emulation */
617     static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count, worker *self)
618     {
619     ssize_t res;
620    
621     if (!count)
622     return 0;
623    
624     #if HAVE_SENDFILE
625     # if __linux
626     res = sendfile (ofd, ifd, &offset, count);
627    
628     # elif __freebsd
629     /*
630     * Of course, the freebsd sendfile is a dire hack with no thoughts
631     * wasted on making it similar to other I/O functions.
632     */
633     {
634     off_t sbytes;
635     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
636    
637     if (res < 0 && sbytes)
638     /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
639     res = sbytes;
640     }
641    
642     # elif __hpux
643     res = sendfile (ofd, ifd, offset, count, 0, 0);
644    
645     # elif __solaris
646     {
647     struct sendfilevec vec;
648     size_t sbytes;
649    
650     vec.sfv_fd = ifd;
651     vec.sfv_flag = 0;
652     vec.sfv_off = offset;
653     vec.sfv_len = count;
654    
655     res = sendfilev (ofd, &vec, 1, &sbytes);
656    
657     if (res < 0 && sbytes)
658     res = sbytes;
659     }
660    
661     # endif
662     #else
663     res = -1;
664     errno = ENOSYS;
665     #endif
666    
667     if (res < 0
668     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
669     #if __solaris
670     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
671     #endif
672     )
673     )
674     {
675     /* emulate sendfile. this is a major pain in the ass */
676     dBUF;
677    
678     res = 0;
679    
680     while (count)
681     {
682     ssize_t cnt;
683    
684     cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
685    
686     if (cnt <= 0)
687     {
688     if (cnt && !res) res = -1;
689     break;
690     }
691    
692     cnt = write (ofd, eio_buf, cnt);
693    
694     if (cnt <= 0)
695     {
696     if (cnt && !res) res = -1;
697     break;
698     }
699    
700     offset += cnt;
701     res += cnt;
702     count -= cnt;
703     }
704     }
705    
706     return res;
707     }
708    
709     /* read a full directory */
710     static void scandir_ (eio_req *req, worker *self)
711     {
712     DIR *dirp;
713     union
714     {
715     EIO_STRUCT_DIRENT d;
716     char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
717     } *u;
718     EIO_STRUCT_DIRENT *entp;
719     char *name, *names;
720     int memlen = 4096;
721     int memofs = 0;
722     int res = 0;
723    
724     X_LOCK (wrklock);
725     self->dirp = dirp = opendir (req->ptr1);
726     self->dbuf = u = malloc (sizeof (*u));
727     req->flags |= EIO_FLAG_PTR2_FREE;
728     req->ptr2 = names = malloc (memlen);
729     X_UNLOCK (wrklock);
730    
731     if (dirp && u && names)
732     for (;;)
733     {
734     errno = 0;
735     readdir_r (dirp, &u->d, &entp);
736    
737     if (!entp)
738     break;
739    
740     name = entp->d_name;
741    
742     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
743     {
744     int len = strlen (name) + 1;
745    
746     res++;
747    
748     while (memofs + len > memlen)
749     {
750     memlen *= 2;
751     X_LOCK (wrklock);
752     req->ptr2 = names = realloc (names, memlen);
753     X_UNLOCK (wrklock);
754    
755     if (!names)
756     break;
757     }
758    
759     memcpy (names + memofs, name, len);
760     memofs += len;
761     }
762     }
763    
764     if (errno)
765     res = -1;
766    
767     req->result = res;
768     }
769    
770     /*****************************************************************************/
771    
772 root 1.6 #define ALLOC(len) \
773     if (!req->ptr2) \
774     { \
775 root 1.7 X_LOCK (wrklock); \
776     req->flags |= EIO_FLAG_PTR2_FREE; \
777     X_UNLOCK (wrklock); \
778     req->ptr2 = malloc (len); \
779     if (!req->ptr2) \
780     { \
781     errno = ENOMEM; \
782     req->result = -1; \
783     break; \
784     } \
785 root 1.6 }
786    
787 root 1.1 X_THREAD_PROC (eio_proc)
788     {
789     eio_req *req;
790     struct timespec ts;
791     worker *self = (worker *)thr_arg;
792    
793     /* try to distribute timeouts somewhat randomly */
794     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
795    
796     for (;;)
797     {
798     ts.tv_sec = time (0) + IDLE_TIMEOUT;
799    
800     X_LOCK (reqlock);
801    
802     for (;;)
803     {
804     self->req = req = reqq_shift (&req_queue);
805    
806     if (req)
807     break;
808    
809     ++idle;
810    
811     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
812     {
813     if (idle > max_idle)
814     {
815     --idle;
816     X_UNLOCK (reqlock);
817     X_LOCK (wrklock);
818     --started;
819     X_UNLOCK (wrklock);
820     goto quit;
821     }
822    
823     /* we are allowed to idle, so do so without any timeout */
824     X_COND_WAIT (reqwait, reqlock);
825     ts.tv_sec = time (0) + IDLE_TIMEOUT;
826     }
827    
828     --idle;
829     }
830    
831     --nready;
832    
833     X_UNLOCK (reqlock);
834    
835     errno = 0; /* strictly unnecessary */
836    
837     if (!EIO_CANCELLED (req))
838     switch (req->type)
839     {
840 root 1.7 case EIO_READ: ALLOC (req->size);
841     req->result = req->offs >= 0
842 root 1.1 ? pread (req->int1, req->ptr2, req->size, req->offs)
843     : read (req->int1, req->ptr2, req->size); break;
844     case EIO_WRITE: req->result = req->offs >= 0
845     ? pwrite (req->int1, req->ptr2, req->size, req->offs)
846     : write (req->int1, req->ptr2, req->size); break;
847    
848     case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
849     case EIO_SENDFILE: req->result = sendfile_ (req->int1, req->int2, req->offs, req->size, self); break;
850    
851 root 1.6 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
852     req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
853     case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
854     req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
855     case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
856     req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
857 root 1.1
858     case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
859     case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
860     case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
861     case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
862     case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
863     case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
864    
865     case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
866     case EIO_CLOSE: req->result = close (req->int1); break;
867     case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
868     case EIO_UNLINK: req->result = unlink (req->ptr1); break;
869     case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
870     case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
871     case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
872     case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
873     case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
874     case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
875 root 1.7
876 root 1.6 case EIO_READLINK: ALLOC (NAME_MAX);
877     req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
878 root 1.1
879     case EIO_SYNC: req->result = 0; sync (); break;
880     case EIO_FSYNC: req->result = fsync (req->int1); break;
881     case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
882    
883     case EIO_READDIR: scandir_ (req, self); break;
884    
885     case EIO_BUSY:
886     #ifdef _WIN32
887     Sleep (req->nv1 * 1000.);
888     #else
889     {
890     struct timeval tv;
891    
892     tv.tv_sec = req->nv1;
893     tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
894    
895     req->result = select (0, 0, 0, 0, &tv);
896     }
897     #endif
898     break;
899    
900     case EIO_UTIME:
901     case EIO_FUTIME:
902     {
903     struct timeval tv[2];
904     struct timeval *times;
905    
906     if (req->nv1 != -1. || req->nv2 != -1.)
907     {
908     tv[0].tv_sec = req->nv1;
909     tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
910     tv[1].tv_sec = req->nv2;
911     tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
912    
913     times = tv;
914     }
915     else
916     times = 0;
917    
918    
919     req->result = req->type == EIO_FUTIME
920     ? futimes (req->int1, times)
921     : utimes (req->ptr1, times);
922     }
923    
924     case EIO_GROUP:
925     case EIO_NOP:
926     break;
927    
928     case EIO_QUIT:
929     goto quit;
930    
931     default:
932     req->result = -1;
933     break;
934     }
935    
936     req->errorno = errno;
937    
938     X_LOCK (reslock);
939    
940     ++npending;
941    
942 root 1.2 if (!reqq_push (&res_queue, req) && want_poll_cb)
943 root 1.1 want_poll_cb ();
944    
945     self->req = 0;
946     worker_clear (self);
947    
948     X_UNLOCK (reslock);
949     }
950    
951     quit:
952     X_LOCK (wrklock);
953     worker_free (self);
954     X_UNLOCK (wrklock);
955    
956     return 0;
957     }
958    
959     /*****************************************************************************/
960    
961 root 1.2 static void eio_atfork_prepare (void)
962 root 1.1 {
963     X_LOCK (wrklock);
964     X_LOCK (reqlock);
965     X_LOCK (reslock);
966     #if !HAVE_PREADWRITE
967     X_LOCK (preadwritelock);
968     #endif
969     #if !HAVE_READDIR_R
970     X_LOCK (readdirlock);
971     #endif
972     }
973    
974 root 1.2 static void eio_atfork_parent (void)
975 root 1.1 {
976     #if !HAVE_READDIR_R
977     X_UNLOCK (readdirlock);
978     #endif
979     #if !HAVE_PREADWRITE
980     X_UNLOCK (preadwritelock);
981     #endif
982     X_UNLOCK (reslock);
983     X_UNLOCK (reqlock);
984     X_UNLOCK (wrklock);
985     }
986    
987 root 1.2 static void eio_atfork_child (void)
988 root 1.1 {
989     eio_req *prv;
990    
991     while (prv = reqq_shift (&req_queue))
992     eio_destroy (prv);
993    
994     while (prv = reqq_shift (&res_queue))
995     eio_destroy (prv);
996    
997     while (wrk_first.next != &wrk_first)
998     {
999     worker *wrk = wrk_first.next;
1000    
1001     if (wrk->req)
1002     eio_destroy (wrk->req);
1003    
1004     worker_clear (wrk);
1005     worker_free (wrk);
1006     }
1007    
1008     started = 0;
1009     idle = 0;
1010     nreqs = 0;
1011     nready = 0;
1012     npending = 0;
1013    
1014 root 1.2 eio_atfork_parent ();
1015 root 1.1 }
1016    
1017     int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1018     {
1019     want_poll_cb = want_poll;
1020     done_poll_cb = done_poll;
1021    
1022     #ifdef _WIN32
1023     X_MUTEX_CHECK (wrklock);
1024     X_MUTEX_CHECK (reslock);
1025     X_MUTEX_CHECK (reqlock);
1026     X_MUTEX_CHECK (reqwait);
1027     X_MUTEX_CHECK (preadwritelock);
1028     X_MUTEX_CHECK (readdirlock);
1029    
1030     X_COND_CHECK (reqwait);
1031     #endif
1032    
1033 root 1.2 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1034 root 1.1 }
1035    
1036 root 1.6 static void eio_api_destroy (eio_req *req)
1037     {
1038     free (req);
1039     }
1040    
1041     #define REQ(rtype) \
1042     eio_req *req; \
1043     \
1044     req = (eio_req *)calloc (1, sizeof *req); \
1045     if (!req) \
1046     return 0; \
1047     \
1048     req->type = EIO_ ## rtype; \
1049     req->pri = EIO_DEFAULT_PRI + EIO_PRI_BIAS; \
1050     req->finish = cb; \
1051     req->destroy = eio_api_destroy;
1052    
1053     #define SEND eio_submit (req); return req
1054    
1055     #define PATH (void)0
1056    
1057     eio_req *eio_fsync (int fd, eio_cb cb)
1058     {
1059     REQ (FSYNC); req->int1 = fd; SEND;
1060     }
1061    
1062     eio_req *eio_fdatasync (int fd, eio_cb cb)
1063     {
1064     REQ (FDATASYNC); req->int1 = fd; SEND;
1065     }
1066    
1067     eio_req *eio_readahead (int fd, off_t offset, size_t length, eio_cb cb)
1068     {
1069     REQ (READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1070     }
1071    
1072     eio_req *eio_read (int fd, off_t offset, size_t length, void *data, eio_cb cb)
1073     {
1074     REQ (READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = data; SEND;
1075     }
1076    
1077     eio_req *eio_write (int fd, off_t offset, size_t length, void *data, eio_cb cb)
1078     {
1079     REQ (WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = data; SEND;
1080     }
1081    
1082     eio_req *eio_fstat (int fd, eio_cb cb)
1083     {
1084     REQ (FSTAT); req->int1 = fd; SEND;
1085     }
1086    
1087     eio_req *eio_futime (int fd, double atime, double mtime, eio_cb cb)
1088     {
1089     REQ (FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1090     }
1091    
1092     eio_req *eio_ftruncate (int fd, off_t offset, eio_cb cb)
1093     {
1094     REQ (FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1095     }
1096    
1097     eio_req *eio_fchmod (int fd, mode_t mode, eio_cb cb)
1098     {
1099     REQ (FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1100     }
1101    
1102     eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, eio_cb cb)
1103     {
1104     REQ (FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1105     }
1106    
1107     eio_req *eio_dup2 (int fd, int fd2, eio_cb cb)
1108     {
1109     REQ (DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1110     }
1111    
1112     eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, eio_cb cb)
1113     {
1114     REQ (SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1115     }
1116    
1117     eio_req *eio_open (const char *path, int flags, mode_t mode, eio_cb cb)
1118     {
1119     REQ (OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1120     }
1121    
1122     eio_req *eio_readlink (const char *path, eio_cb cb)
1123     {
1124     REQ (READLINK); PATH; SEND;
1125     }
1126    
1127     eio_req *eio_stat (const char *path, eio_cb cb)
1128     {
1129     REQ (STAT); PATH; SEND;
1130     }
1131    
1132     eio_req *eio_lstat (const char *path, eio_cb cb)
1133     {
1134     REQ (LSTAT); PATH; SEND;
1135     }
1136    
1137     eio_req *eio_utime (const char *path, double atime, double mtime, eio_cb cb)
1138     {
1139     REQ (UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1140     }
1141    
1142     eio_req *eio_truncate (const char *path, off_t offset, eio_cb cb)
1143     {
1144     REQ (TRUNCATE); PATH; req->offs = offset; SEND;
1145     }
1146    
1147     eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, eio_cb cb)
1148     {
1149     REQ (CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1150     }
1151    
1152     eio_req *eio_chmod (const char *path, mode_t mode, eio_cb cb)
1153     {
1154     REQ (CHMOD); PATH; req->int2 = (long)mode; SEND;
1155     }
1156    
1157     eio_req *eio_mkdir (const char *path, mode_t mode, eio_cb cb)
1158     {
1159     REQ (MKDIR); PATH; req->int2 = (long)mode; SEND;
1160     }
1161    
1162     eio_req *eio_unlink (const char *path, eio_cb cb)
1163     {
1164     REQ (UNLINK); PATH; SEND;
1165     }
1166    
1167     eio_req *eio_rmdir (const char *path, eio_cb cb)
1168     {
1169     REQ (RMDIR); PATH; SEND;
1170     }
1171    
1172     eio_req *eio_readdir (const char *path, eio_cb cb)
1173     {
1174     REQ (READDIR); PATH; SEND;
1175     }
1176    
1177     eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, eio_cb cb)
1178     {
1179     REQ (MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1180     }
1181    
1182     eio_req *eio_busy (double delay, eio_cb cb)
1183     {
1184     REQ (BUSY); req->nv1 = delay; SEND;
1185     }
1186    
1187     eio_req *eio_nop (eio_cb cb)
1188     {
1189     REQ (NOP); SEND;
1190     }
1191    
1192     #undef REQ
1193     #undef PATH
1194     #undef SEND
1195    
1196 root 1.1 #if 0
1197    
1198     void
1199     aio_open (SV8 *pathname, int flags, int mode, SV *callback=&PL_sv_undef)
1200     PROTOTYPE: $$$;$
1201     PPCODE:
1202     {
1203     dREQ;
1204    
1205     req->type = EIO_OPEN;
1206     req->sv1 = newSVsv (pathname);
1207     req->ptr1 = SvPVbyte_nolen (req->sv1);
1208     req->int1 = flags;
1209     req->int2 = mode;
1210    
1211     EIO_SEND;
1212     }
1213    
1214     void
1215     aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1216     PROTOTYPE: $;$
1217     ALIAS:
1218     aio_fsync = EIO_FSYNC
1219     aio_fdatasync = EIO_FDATASYNC
1220     PPCODE:
1221     {
1222     dREQ;
1223    
1224     req->type = ix;
1225     req->sv1 = newSVsv (fh);
1226     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1227    
1228     EIO_SEND (req);
1229     }
1230    
1231     void
1232     aio_close (SV *fh, SV *callback=&PL_sv_undef)
1233     PROTOTYPE: $;$
1234     PPCODE:
1235     {
1236     dREQ;
1237    
1238     req->type = EIO_CLOSE;
1239     req->sv1 = newSVsv (fh);
1240     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1241    
1242     EIO_SEND (req);
1243     }
1244    
1245     void
1246     aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef)
1247     ALIAS:
1248     aio_read = EIO_READ
1249     aio_write = EIO_WRITE
1250     PROTOTYPE: $$$$$;$
1251     PPCODE:
1252     {
1253     STRLEN svlen;
1254     char *svptr = SvPVbyte (data, svlen);
1255     UV len = SvUV (length);
1256    
1257     SvUPGRADE (data, SVt_PV);
1258     SvPOK_on (data);
1259    
1260     if (dataoffset < 0)
1261     dataoffset += svlen;
1262    
1263     if (dataoffset < 0 || dataoffset > svlen)
1264     croak ("dataoffset outside of data scalar");
1265    
1266     if (ix == EIO_WRITE)
1267     {
1268     /* write: check length and adjust. */
1269     if (!SvOK (length) || len + dataoffset > svlen)
1270     len = svlen - dataoffset;
1271     }
1272     else
1273     {
1274     /* read: grow scalar as necessary */
1275     svptr = SvGROW (data, len + dataoffset + 1);
1276     }
1277    
1278     if (len < 0)
1279     croak ("length must not be negative");
1280    
1281     {
1282     dREQ;
1283    
1284     req->type = ix;
1285     req->sv1 = newSVsv (fh);
1286     req->int1 = PerlIO_fileno (ix == EIO_READ ? IoIFP (sv_2io (fh))
1287     : IoOFP (sv_2io (fh)));
1288     req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1289     req->size = len;
1290     req->sv2 = SvREFCNT_inc (data);
1291     req->ptr2 = (char *)svptr + dataoffset;
1292     req->stroffset = dataoffset;
1293    
1294     if (!SvREADONLY (data))
1295     {
1296     SvREADONLY_on (data);
1297     req->flags |= FLAG_SV2_RO_OFF;
1298     }
1299    
1300     EIO_SEND;
1301     }
1302     }
1303    
1304     void
1305     aio_readlink (SV8 *path, SV *callback=&PL_sv_undef)
1306     PROTOTYPE: $$;$
1307     PPCODE:
1308     {
1309     SV *data;
1310     dREQ;
1311    
1312     data = newSV (NAME_MAX);
1313     SvPOK_on (data);
1314    
1315     req->type = EIO_READLINK;
1316     req->sv1 = newSVsv (path);
1317     req->ptr1 = SvPVbyte_nolen (req->sv1);
1318     req->sv2 = data;
1319     req->ptr2 = SvPVbyte_nolen (data);
1320    
1321     EIO_SEND;
1322     }
1323    
1324     void
1325     aio_sendfile (SV *out_fh, SV *in_fh, SV *in_offset, UV length, SV *callback=&PL_sv_undef)
1326     PROTOTYPE: $$$$;$
1327     PPCODE:
1328     {
1329     dREQ;
1330    
1331     req->type = EIO_SENDFILE;
1332     req->sv1 = newSVsv (out_fh);
1333     req->int1 = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1334     req->sv2 = newSVsv (in_fh);
1335     req->int2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1336     req->offs = SvVAL64 (in_offset);
1337     req->size = length;
1338    
1339     EIO_SEND;
1340     }
1341    
1342     void
1343     aio_readahead (SV *fh, SV *offset, IV length, SV *callback=&PL_sv_undef)
1344     PROTOTYPE: $$$;$
1345     PPCODE:
1346     {
1347     dREQ;
1348    
1349     req->type = EIO_READAHEAD;
1350     req->sv1 = newSVsv (fh);
1351     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1352     req->offs = SvVAL64 (offset);
1353     req->size = length;
1354    
1355     EIO_SEND;
1356     }
1357    
1358     void
1359     aio_stat (SV8 *fh_or_path, SV *callback=&PL_sv_undef)
1360     ALIAS:
1361     aio_stat = EIO_STAT
1362     aio_lstat = EIO_LSTAT
1363     PPCODE:
1364     {
1365     dREQ;
1366    
1367     req->ptr2 = malloc (sizeof (EIO_STRUCT_STAT));
1368     if (!req->ptr2)
1369     {
1370     req_destroy (req);
1371     croak ("out of memory during aio_stat statdata allocation");
1372     }
1373    
1374     req->flags |= FLAG_PTR2_FREE;
1375     req->sv1 = newSVsv (fh_or_path);
1376    
1377     if (SvPOK (fh_or_path))
1378     {
1379     req->type = ix;
1380     req->ptr1 = SvPVbyte_nolen (req->sv1);
1381     }
1382     else
1383     {
1384     req->type = EIO_FSTAT;
1385     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1386     }
1387    
1388     EIO_SEND;
1389     }
1390    
1391     void
1392     aio_utime (SV8 *fh_or_path, SV *atime, SV *mtime, SV *callback=&PL_sv_undef)
1393     PPCODE:
1394     {
1395     dREQ;
1396    
1397     req->nv1 = SvOK (atime) ? SvNV (atime) : -1.;
1398     req->nv2 = SvOK (mtime) ? SvNV (mtime) : -1.;
1399     req->sv1 = newSVsv (fh_or_path);
1400    
1401     if (SvPOK (fh_or_path))
1402     {
1403     req->type = EIO_UTIME;
1404     req->ptr1 = SvPVbyte_nolen (req->sv1);
1405     }
1406     else
1407     {
1408     req->type = EIO_FUTIME;
1409     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1410     }
1411    
1412     EIO_SEND;
1413     }
1414    
1415     void
1416     aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef)
1417     PPCODE:
1418     {
1419     dREQ;
1420    
1421     req->sv1 = newSVsv (fh_or_path);
1422     req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1423    
1424     if (SvPOK (fh_or_path))
1425     {
1426     req->type = EIO_TRUNCATE;
1427     req->ptr1 = SvPVbyte_nolen (req->sv1);
1428     }
1429     else
1430     {
1431     req->type = EIO_FTRUNCATE;
1432     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1433     }
1434    
1435     EIO_SEND;
1436     }
1437    
1438     void
1439     aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef)
1440     ALIAS:
1441     aio_chmod = EIO_CHMOD
1442     aio_fchmod = EIO_FCHMOD
1443     aio_mkdir = EIO_MKDIR
1444     PPCODE:
1445     {
1446     dREQ;
1447    
1448     req->type = type;
1449     req->int2 = mode;
1450     req->sv1 = newSVsv (fh_or_path);
1451    
1452     if (ix == EIO_FCHMOD)
1453     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1454     else
1455     req->ptr1 = SvPVbyte_nolen (req->sv1);
1456    
1457     EIO_SEND;
1458     }
1459    
1460     void
1461     aio_chown (SV8 *fh_or_path, SV *uid, SV *gid, SV *callback=&PL_sv_undef)
1462     PPCODE:
1463     {
1464     dREQ;
1465    
1466     req->int2 = SvOK (uid) ? SvIV (uid) : -1;
1467     req->int3 = SvOK (gid) ? SvIV (gid) : -1;
1468     req->sv1 = newSVsv (fh_or_path);
1469    
1470     if (SvPOK (fh_or_path))
1471     {
1472     req->type = EIO_CHOWN;
1473     req->ptr1 = SvPVbyte_nolen (req->sv1);
1474     }
1475     else
1476     {
1477     req->type = EIO_FCHOWN;
1478     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1479     }
1480    
1481     EIO_SEND;
1482     }
1483    
1484     void
1485     aio_unlink (SV8 *pathname, SV *callback=&PL_sv_undef)
1486     ALIAS:
1487     aio_unlink = EIO_UNLINK
1488     aio_rmdir = EIO_RMDIR
1489     aio_readdir = EIO_READDIR
1490     PPCODE:
1491     {
1492     dREQ;
1493    
1494     req->type = ix;
1495     req->sv1 = newSVsv (pathname);
1496     req->ptr1 = SvPVbyte_nolen (req->sv1);
1497    
1498     EIO_SEND;
1499     }
1500    
1501     void
1502     aio_link (SV8 *oldpath, SV8 *newpath, SV *callback=&PL_sv_undef)
1503     ALIAS:
1504     aio_link = EIO_LINK
1505     aio_symlink = EIO_SYMLINK
1506     aio_rename = EIO_RENAME
1507     PPCODE:
1508     {
1509     dREQ;
1510    
1511     req->type = ix;
1512     req->sv1 = newSVsv (oldpath);
1513     req->ptr1 = SvPVbyte_nolen (req->sv1);
1514     req->sv2 = newSVsv (newpath);
1515     req->ptr2 = SvPVbyte_nolen (req->sv2);
1516    
1517     EIO_SEND;
1518     }
1519    
1520     void
1521     aio_mknod (SV8 *pathname, int mode, UV dev, SV *callback=&PL_sv_undef)
1522     PPCODE:
1523     {
1524     dREQ;
1525    
1526     req->type = EIO_MKNOD;
1527     req->sv1 = newSVsv (pathname);
1528     req->ptr1 = SvPVbyte_nolen (req->sv1);
1529     req->int2 = (mode_t)mode;
1530     req->offs = dev;
1531    
1532     EIO_SEND;
1533     }
1534    
1535     void
1536     aio_busy (double delay, SV *callback=&PL_sv_undef)
1537     PPCODE:
1538     {
1539     dREQ;
1540    
1541     req->type = EIO_BUSY;
1542     req->nv1 = delay < 0. ? 0. : delay;
1543    
1544     EIO_SEND;
1545     }
1546    
1547     void
1548     aio_group (SV *callback=&PL_sv_undef)
1549     PROTOTYPE: ;$
1550     PPCODE:
1551     {
1552     dREQ;
1553    
1554     req->type = EIO_GROUP;
1555    
1556     req_send (req);
1557     XPUSHs (req_sv (req, AIO_GRP_KLASS));
1558     }
1559    
1560     void
1561     aio_nop (SV *callback=&PL_sv_undef)
1562     ALIAS:
1563     aio_nop = EIO_NOP
1564     aio_sync = EIO_SYNC
1565     PPCODE:
1566     {
1567     dREQ;
1568    
1569     #endif
1570    
1571 root 1.2 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1572 root 1.1 {
1573     grp->int2 = limit;
1574     grp->feed = feed;
1575 root 1.2
1576     grp_try_feed (grp);
1577     }
1578    
1579     void eio_grp_limit (eio_req *grp, int limit)
1580     {
1581     grp->int2 = limit;
1582    
1583     grp_try_feed (grp);
1584 root 1.1 }
1585    
1586     void eio_grp_add (eio_req *grp, eio_req *req)
1587     {
1588     assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1589    
1590     ++grp->size;
1591     req->grp = grp;
1592    
1593     req->grp_prev = 0;
1594     req->grp_next = grp->grp_first;
1595    
1596     if (grp->grp_first)
1597     grp->grp_first->grp_prev = req;
1598    
1599     grp->grp_first = req;
1600     }
1601    
1602