ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
Revision: 1.6
Committed: Sun May 11 19:11:05 2008 UTC (16 years ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.5: +184 -34 lines
Log Message:
implement high-level eio functions (untested), bugfixes

File Contents

# User Rev Content
1 root 1.1 #include "eio.h"
2     #include "xthread.h"
3    
4     #include <errno.h>
5     #include <stddef.h>
6     #include <stdlib.h>
7 root 1.3 #include <string.h>
8 root 1.1 #include <errno.h>
9     #include <sys/types.h>
10     #include <sys/stat.h>
11     #include <limits.h>
12     #include <fcntl.h>
13     #include <sched.h>
14    
15     #ifndef EIO_FINISH
16     # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0
17     #endif
18    
19     #ifndef EIO_DESTROY
20     # define EIO_DESTROY(req) do { if ((req)->destroy) (req)->destroy (req); } while (0)
21     #endif
22    
23     #ifndef EIO_FEED
24     # define EIO_FEED(req) do { if ((req)->feed ) (req)->feed (req); } while (0)
25     #endif
26    
27     #ifdef _WIN32
28    
29     /*doh*/
30    
31     #else
32    
33     # include "config.h"
34     # include <sys/time.h>
35     # include <sys/select.h>
36     # include <unistd.h>
37     # include <utime.h>
38     # include <signal.h>
39 root 1.4 # include <dirent.h>
40 root 1.1
41     # ifndef EIO_STRUCT_DIRENT
42     # define EIO_STRUCT_DIRENT struct dirent
43     # endif
44    
45     #endif
46    
47     # ifndef EIO_STRUCT_STAT
48     # define EIO_STRUCT_STAT struct stat
49     # endif
50    
51     #if HAVE_SENDFILE
52     # if __linux
53     # include <sys/sendfile.h>
54     # elif __freebsd
55     # include <sys/socket.h>
56     # include <sys/uio.h>
57     # elif __hpux
58     # include <sys/socket.h>
59     # elif __solaris /* not yet */
60     # include <sys/sendfile.h>
61     # else
62     # error sendfile support requested but not available
63     # endif
64     #endif
65    
66     /* number of seconds after which an idle threads exit */
67     #define IDLE_TIMEOUT 10
68    
69     /* used for struct dirent, AIX doesn't provide it */
70     #ifndef NAME_MAX
71     # define NAME_MAX 4096
72     #endif
73    
74     /* buffer size for various temporary buffers */
75     #define EIO_BUFSIZE 65536
76    
77     #define dBUF \
78     char *eio_buf; \
79     X_LOCK (wrklock); \
80     self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
81     X_UNLOCK (wrklock); \
82     if (!eio_buf) \
83     return -1;
84    
85     #define EIO_TICKS ((1000000 + 1023) >> 10)
86    
87     static void (*want_poll_cb) (void);
88     static void (*done_poll_cb) (void);
89    
90     static unsigned int max_poll_time = 0;
91     static unsigned int max_poll_reqs = 0;
92    
93     /* calculcate time difference in ~1/EIO_TICKS of a second */
94     static int tvdiff (struct timeval *tv1, struct timeval *tv2)
95     {
96     return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
97     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
98     }
99    
100     static unsigned int started, idle, wanted;
101    
102     /* worker threads management */
103     static mutex_t wrklock = X_MUTEX_INIT;
104    
105     typedef struct worker {
106     /* locked by wrklock */
107     struct worker *prev, *next;
108    
109     thread_t tid;
110    
111     /* locked by reslock, reqlock or wrklock */
112     eio_req *req; /* currently processed request */
113     void *dbuf;
114     DIR *dirp;
115     } worker;
116    
117     static worker wrk_first = { &wrk_first, &wrk_first, 0 };
118    
119     static void worker_clear (worker *wrk)
120     {
121     if (wrk->dirp)
122     {
123     closedir (wrk->dirp);
124     wrk->dirp = 0;
125     }
126    
127     if (wrk->dbuf)
128     {
129     free (wrk->dbuf);
130     wrk->dbuf = 0;
131     }
132     }
133    
134     static void worker_free (worker *wrk)
135     {
136     wrk->next->prev = wrk->prev;
137     wrk->prev->next = wrk->next;
138    
139     free (wrk);
140     }
141    
142     static volatile unsigned int nreqs, nready, npending;
143     static volatile unsigned int max_idle = 4;
144    
145     static mutex_t reslock = X_MUTEX_INIT;
146     static mutex_t reqlock = X_MUTEX_INIT;
147     static cond_t reqwait = X_COND_INIT;
148    
149     unsigned int eio_nreqs (void)
150     {
151     return nreqs;
152     }
153    
154     unsigned int eio_nready (void)
155     {
156     unsigned int retval;
157    
158     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159     retval = nready;
160     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
161    
162     return retval;
163     }
164    
165     unsigned int eio_npending (void)
166     {
167     unsigned int retval;
168    
169     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
170     retval = npending;
171     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
172    
173     return retval;
174     }
175    
176     unsigned int eio_nthreads (void)
177     {
178     unsigned int retval;
179    
180     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
181     retval = started;
182     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
183    
184     return retval;
185     }
186    
187     /*
188     * a somewhat faster data structure might be nice, but
189     * with 8 priorities this actually needs <20 insns
190     * per shift, the most expensive operation.
191     */
192     typedef struct {
193     eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
194     int size;
195     } reqq;
196    
197     static reqq req_queue;
198     static reqq res_queue;
199    
200     static int reqq_push (reqq *q, eio_req *req)
201     {
202     int pri = req->pri;
203     req->next = 0;
204    
205     if (q->qe[pri])
206     {
207     q->qe[pri]->next = req;
208     q->qe[pri] = req;
209     }
210     else
211     q->qe[pri] = q->qs[pri] = req;
212    
213     return q->size++;
214     }
215    
216     static eio_req *reqq_shift (reqq *q)
217     {
218     int pri;
219    
220     if (!q->size)
221     return 0;
222    
223     --q->size;
224    
225     for (pri = EIO_NUM_PRI; pri--; )
226     {
227     eio_req *req = q->qs[pri];
228    
229     if (req)
230     {
231     if (!(q->qs[pri] = (eio_req *)req->next))
232     q->qe[pri] = 0;
233    
234     return req;
235     }
236     }
237    
238     abort ();
239     }
240    
241 root 1.2 static void grp_try_feed (eio_req *grp)
242 root 1.1 {
243     while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
244     {
245     int old_len = grp->size;
246    
247     EIO_FEED (grp);
248    
249     /* stop if no progress has been made */
250     if (old_len == grp->size)
251     {
252     grp->feed = 0;
253 root 1.2 break;
254 root 1.1 }
255     }
256     }
257    
258 root 1.2 static int eio_finish (eio_req *req);
259 root 1.1
260     static int grp_dec (eio_req *grp)
261     {
262     --grp->size;
263    
264     /* call feeder, if applicable */
265 root 1.2 grp_try_feed (grp);
266 root 1.1
267     /* finish, if done */
268     if (!grp->size && grp->int1)
269 root 1.2 return eio_finish (grp);
270 root 1.1 else
271     return 0;
272     }
273    
274     void eio_destroy (eio_req *req)
275     {
276 root 1.6 if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1);
277     if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2);
278 root 1.1
279     EIO_DESTROY (req);
280     }
281    
282 root 1.2 static int eio_finish (eio_req *req)
283 root 1.1 {
284     int res = EIO_FINISH (req);
285    
286     if (req->grp)
287     {
288     int res2;
289     eio_req *grp = req->grp;
290    
291     /* unlink request */
292     if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
293     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
294    
295     if (grp->grp_first == req)
296     grp->grp_first = req->grp_next;
297    
298     res2 = grp_dec (grp);
299    
300     if (!res && res2)
301     res = res2;
302     }
303    
304     eio_destroy (req);
305    
306     return res;
307     }
308    
309     void eio_grp_cancel (eio_req *grp)
310     {
311     for (grp = grp->grp_first; grp; grp = grp->grp_next)
312     eio_cancel (grp);
313     }
314    
315     void eio_cancel (eio_req *req)
316     {
317 root 1.6 X_LOCK (wrklock);
318 root 1.1 req->flags |= EIO_FLAG_CANCELLED;
319 root 1.6 X_UNLOCK (wrklock);
320 root 1.1
321     eio_grp_cancel (req);
322     }
323    
324     X_THREAD_PROC (eio_proc);
325    
326     static void start_thread (void)
327     {
328     worker *wrk = calloc (1, sizeof (worker));
329    
330     if (!wrk)
331     croak ("unable to allocate worker thread data");
332    
333     X_LOCK (wrklock);
334    
335     if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
336     {
337     wrk->prev = &wrk_first;
338     wrk->next = wrk_first.next;
339     wrk_first.next->prev = wrk;
340     wrk_first.next = wrk;
341     ++started;
342     }
343     else
344     free (wrk);
345    
346     X_UNLOCK (wrklock);
347     }
348    
349     static void maybe_start_thread (void)
350     {
351     if (eio_nthreads () >= wanted)
352     return;
353    
354     /* todo: maybe use idle here, but might be less exact */
355     if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
356     return;
357    
358     start_thread ();
359     }
360    
361     void eio_submit (eio_req *req)
362     {
363     ++nreqs;
364    
365     X_LOCK (reqlock);
366     ++nready;
367     reqq_push (&req_queue, req);
368     X_COND_SIGNAL (reqwait);
369     X_UNLOCK (reqlock);
370    
371     maybe_start_thread ();
372     }
373    
374     static void end_thread (void)
375     {
376     eio_req *req = calloc (1, sizeof (eio_req));
377    
378     req->type = EIO_QUIT;
379     req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
380    
381     X_LOCK (reqlock);
382     reqq_push (&req_queue, req);
383     X_COND_SIGNAL (reqwait);
384     X_UNLOCK (reqlock);
385    
386     X_LOCK (wrklock);
387     --started;
388     X_UNLOCK (wrklock);
389     }
390    
391     void eio_set_max_poll_time (double nseconds)
392     {
393     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
394     max_poll_time = nseconds;
395     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
396     }
397    
398     void eio_set_max_poll_reqs (unsigned int maxreqs)
399     {
400     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
401     max_poll_reqs = maxreqs;
402     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
403     }
404    
405     void eio_set_max_idle (unsigned int nthreads)
406     {
407     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
408     max_idle = nthreads <= 0 ? 1 : nthreads;
409     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
410     }
411    
412     void eio_set_min_parallel (unsigned int nthreads)
413     {
414     if (wanted < nthreads)
415     wanted = nthreads;
416     }
417    
418     void eio_set_max_parallel (unsigned int nthreads)
419     {
420     if (wanted > nthreads)
421     wanted = nthreads;
422    
423     while (started > wanted)
424     end_thread ();
425     }
426    
427     int eio_poll (void)
428     {
429     int maxreqs = max_poll_reqs;
430     struct timeval tv_start, tv_now;
431     eio_req *req;
432    
433     if (max_poll_time)
434     gettimeofday (&tv_start, 0);
435    
436     for (;;)
437     {
438     maybe_start_thread ();
439    
440     X_LOCK (reslock);
441     req = reqq_shift (&res_queue);
442    
443     if (req)
444     {
445     --npending;
446    
447 root 1.2 if (!res_queue.size && done_poll_cb)
448 root 1.1 done_poll_cb ();
449     }
450    
451     X_UNLOCK (reslock);
452    
453     if (!req)
454     return 0;
455    
456     --nreqs;
457    
458     if (req->type == EIO_GROUP && req->size)
459     {
460     req->int1 = 1; /* mark request as delayed */
461     continue;
462     }
463     else
464     {
465 root 1.2 int res = eio_finish (req);
466 root 1.1 if (res)
467     return res;
468     }
469    
470     if (maxreqs && !--maxreqs)
471     break;
472    
473     if (max_poll_time)
474     {
475     gettimeofday (&tv_now, 0);
476    
477     if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
478     break;
479     }
480     }
481    
482     errno = EAGAIN;
483     return -1;
484     }
485    
486     /*****************************************************************************/
487     /* work around various missing functions */
488    
489     #if !HAVE_PREADWRITE
490     # define pread aio_pread
491     # define pwrite aio_pwrite
492    
493     /*
494     * make our pread/pwrite safe against themselves, but not against
495     * normal read/write by using a mutex. slows down execution a lot,
496     * but that's your problem, not mine.
497     */
498     static mutex_t preadwritelock = X_MUTEX_INIT;
499    
500     static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
501     {
502     ssize_t res;
503     off_t ooffset;
504    
505     X_LOCK (preadwritelock);
506     ooffset = lseek (fd, 0, SEEK_CUR);
507     lseek (fd, offset, SEEK_SET);
508     res = read (fd, buf, count);
509     lseek (fd, ooffset, SEEK_SET);
510     X_UNLOCK (preadwritelock);
511    
512     return res;
513     }
514    
515     static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
516     {
517     ssize_t res;
518     off_t ooffset;
519    
520     X_LOCK (preadwritelock);
521     ooffset = lseek (fd, 0, SEEK_CUR);
522     lseek (fd, offset, SEEK_SET);
523     res = write (fd, buf, count);
524     lseek (fd, offset, SEEK_SET);
525     X_UNLOCK (preadwritelock);
526    
527     return res;
528     }
529     #endif
530    
531     #ifndef HAVE_FUTIMES
532    
533     # define utimes(path,times) aio_utimes (path, times)
534     # define futimes(fd,times) aio_futimes (fd, times)
535    
536     static int aio_utimes (const char *filename, const struct timeval times[2])
537     {
538     if (times)
539     {
540     struct utimbuf buf;
541    
542     buf.actime = times[0].tv_sec;
543     buf.modtime = times[1].tv_sec;
544    
545     return utime (filename, &buf);
546     }
547     else
548     return utime (filename, 0);
549     }
550    
551     static int aio_futimes (int fd, const struct timeval tv[2])
552     {
553     errno = ENOSYS;
554     return -1;
555     }
556    
557     #endif
558    
559     #if !HAVE_FDATASYNC
560     # define fdatasync fsync
561     #endif
562    
563     #if !HAVE_READAHEAD
564     # define readahead(fd,offset,count) aio_readahead (fd, offset, count, self)
565    
566     static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self)
567     {
568     size_t todo = count;
569     dBUF;
570    
571     while (todo > 0)
572     {
573     size_t len = todo < EIO_BUFSIZE ? todo : EIO_BUFSIZE;
574    
575 root 1.3 pread (fd, eio_buf, len, offset);
576 root 1.1 offset += len;
577     todo -= len;
578     }
579    
580     errno = 0;
581     return count;
582     }
583    
584     #endif
585    
586     #if !HAVE_READDIR_R
587     # define readdir_r aio_readdir_r
588    
589     static mutex_t readdirlock = X_MUTEX_INIT;
590    
591 root 1.5 static int readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
592 root 1.1 {
593 root 1.5 EIO_STRUCT_DIRENT *e;
594 root 1.1 int errorno;
595    
596     X_LOCK (readdirlock);
597    
598     e = readdir (dirp);
599     errorno = errno;
600    
601     if (e)
602     {
603     *res = ent;
604     strcpy (ent->d_name, e->d_name);
605     }
606     else
607     *res = 0;
608    
609     X_UNLOCK (readdirlock);
610    
611     errno = errorno;
612     return e ? 0 : -1;
613     }
614     #endif
615    
616     /* sendfile always needs emulation */
617     static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count, worker *self)
618     {
619     ssize_t res;
620    
621     if (!count)
622     return 0;
623    
624     #if HAVE_SENDFILE
625     # if __linux
626     res = sendfile (ofd, ifd, &offset, count);
627    
628     # elif __freebsd
629     /*
630     * Of course, the freebsd sendfile is a dire hack with no thoughts
631     * wasted on making it similar to other I/O functions.
632     */
633     {
634     off_t sbytes;
635     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
636    
637     if (res < 0 && sbytes)
638     /* maybe only on EAGAIN: as usual, the manpage leaves you guessing */
639     res = sbytes;
640     }
641    
642     # elif __hpux
643     res = sendfile (ofd, ifd, offset, count, 0, 0);
644    
645     # elif __solaris
646     {
647     struct sendfilevec vec;
648     size_t sbytes;
649    
650     vec.sfv_fd = ifd;
651     vec.sfv_flag = 0;
652     vec.sfv_off = offset;
653     vec.sfv_len = count;
654    
655     res = sendfilev (ofd, &vec, 1, &sbytes);
656    
657     if (res < 0 && sbytes)
658     res = sbytes;
659     }
660    
661     # endif
662     #else
663     res = -1;
664     errno = ENOSYS;
665     #endif
666    
667     if (res < 0
668     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
669     #if __solaris
670     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
671     #endif
672     )
673     )
674     {
675     /* emulate sendfile. this is a major pain in the ass */
676     dBUF;
677    
678     res = 0;
679    
680     while (count)
681     {
682     ssize_t cnt;
683    
684     cnt = pread (ifd, eio_buf, count > EIO_BUFSIZE ? EIO_BUFSIZE : count, offset);
685    
686     if (cnt <= 0)
687     {
688     if (cnt && !res) res = -1;
689     break;
690     }
691    
692     cnt = write (ofd, eio_buf, cnt);
693    
694     if (cnt <= 0)
695     {
696     if (cnt && !res) res = -1;
697     break;
698     }
699    
700     offset += cnt;
701     res += cnt;
702     count -= cnt;
703     }
704     }
705    
706     return res;
707     }
708    
709     /* read a full directory */
710     static void scandir_ (eio_req *req, worker *self)
711     {
712     DIR *dirp;
713     union
714     {
715     EIO_STRUCT_DIRENT d;
716     char b [offsetof (EIO_STRUCT_DIRENT, d_name) + NAME_MAX + 1];
717     } *u;
718     EIO_STRUCT_DIRENT *entp;
719     char *name, *names;
720     int memlen = 4096;
721     int memofs = 0;
722     int res = 0;
723    
724     X_LOCK (wrklock);
725     self->dirp = dirp = opendir (req->ptr1);
726     self->dbuf = u = malloc (sizeof (*u));
727     req->flags |= EIO_FLAG_PTR2_FREE;
728     req->ptr2 = names = malloc (memlen);
729     X_UNLOCK (wrklock);
730    
731     if (dirp && u && names)
732     for (;;)
733     {
734     errno = 0;
735     readdir_r (dirp, &u->d, &entp);
736    
737     if (!entp)
738     break;
739    
740     name = entp->d_name;
741    
742     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
743     {
744     int len = strlen (name) + 1;
745    
746     res++;
747    
748     while (memofs + len > memlen)
749     {
750     memlen *= 2;
751     X_LOCK (wrklock);
752     req->ptr2 = names = realloc (names, memlen);
753     X_UNLOCK (wrklock);
754    
755     if (!names)
756     break;
757     }
758    
759     memcpy (names + memofs, name, len);
760     memofs += len;
761     }
762     }
763    
764     if (errno)
765     res = -1;
766    
767     req->result = res;
768     }
769    
770     /*****************************************************************************/
771    
772 root 1.6 #define ALLOC(len) \
773     X_LOCK (wrklock); \
774     req->flags |= EIO_FLAG_PTR2_FREE; \
775     X_UNLOCK (wrklock); \
776     req->ptr2 = malloc (len); \
777     if (!req->ptr2) \
778     { \
779     errno = ENOMEM; \
780     req->result = -1; \
781     break; \
782     }
783    
784 root 1.1 X_THREAD_PROC (eio_proc)
785     {
786     eio_req *req;
787     struct timespec ts;
788     worker *self = (worker *)thr_arg;
789    
790     /* try to distribute timeouts somewhat randomly */
791     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
792    
793     for (;;)
794     {
795     ts.tv_sec = time (0) + IDLE_TIMEOUT;
796    
797     X_LOCK (reqlock);
798    
799     for (;;)
800     {
801     self->req = req = reqq_shift (&req_queue);
802    
803     if (req)
804     break;
805    
806     ++idle;
807    
808     if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
809     {
810     if (idle > max_idle)
811     {
812     --idle;
813     X_UNLOCK (reqlock);
814     X_LOCK (wrklock);
815     --started;
816     X_UNLOCK (wrklock);
817     goto quit;
818     }
819    
820     /* we are allowed to idle, so do so without any timeout */
821     X_COND_WAIT (reqwait, reqlock);
822     ts.tv_sec = time (0) + IDLE_TIMEOUT;
823     }
824    
825     --idle;
826     }
827    
828     --nready;
829    
830     X_UNLOCK (reqlock);
831    
832     errno = 0; /* strictly unnecessary */
833    
834     if (!EIO_CANCELLED (req))
835     switch (req->type)
836     {
837     case EIO_READ: req->result = req->offs >= 0
838     ? pread (req->int1, req->ptr2, req->size, req->offs)
839     : read (req->int1, req->ptr2, req->size); break;
840     case EIO_WRITE: req->result = req->offs >= 0
841     ? pwrite (req->int1, req->ptr2, req->size, req->offs)
842     : write (req->int1, req->ptr2, req->size); break;
843    
844     case EIO_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break;
845     case EIO_SENDFILE: req->result = sendfile_ (req->int1, req->int2, req->offs, req->size, self); break;
846    
847 root 1.6 case EIO_STAT: ALLOC (sizeof (EIO_STRUCT_STAT));
848     req->result = stat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
849     case EIO_LSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
850     req->result = lstat (req->ptr1, (EIO_STRUCT_STAT *)req->ptr2); break;
851     case EIO_FSTAT: ALLOC (sizeof (EIO_STRUCT_STAT));
852     req->result = fstat (req->int1, (EIO_STRUCT_STAT *)req->ptr2); break;
853 root 1.1
854     case EIO_CHOWN: req->result = chown (req->ptr1, req->int2, req->int3); break;
855     case EIO_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break;
856     case EIO_CHMOD: req->result = chmod (req->ptr1, (mode_t)req->int2); break;
857     case EIO_FCHMOD: req->result = fchmod (req->int1, (mode_t)req->int2); break;
858     case EIO_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break;
859     case EIO_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break;
860    
861     case EIO_OPEN: req->result = open (req->ptr1, req->int1, (mode_t)req->int2); break;
862     case EIO_CLOSE: req->result = close (req->int1); break;
863     case EIO_DUP2: req->result = dup2 (req->int1, req->int2); break;
864     case EIO_UNLINK: req->result = unlink (req->ptr1); break;
865     case EIO_RMDIR: req->result = rmdir (req->ptr1); break;
866     case EIO_MKDIR: req->result = mkdir (req->ptr1, (mode_t)req->int2); break;
867     case EIO_RENAME: req->result = rename (req->ptr1, req->ptr2); break;
868     case EIO_LINK: req->result = link (req->ptr1, req->ptr2); break;
869     case EIO_SYMLINK: req->result = symlink (req->ptr1, req->ptr2); break;
870     case EIO_MKNOD: req->result = mknod (req->ptr1, (mode_t)req->int2, (dev_t)req->offs); break;
871 root 1.6 case EIO_READLINK: ALLOC (NAME_MAX);
872     req->result = readlink (req->ptr1, req->ptr2, NAME_MAX); break;
873 root 1.1
874     case EIO_SYNC: req->result = 0; sync (); break;
875     case EIO_FSYNC: req->result = fsync (req->int1); break;
876     case EIO_FDATASYNC: req->result = fdatasync (req->int1); break;
877    
878     case EIO_READDIR: scandir_ (req, self); break;
879    
880     case EIO_BUSY:
881     #ifdef _WIN32
882     Sleep (req->nv1 * 1000.);
883     #else
884     {
885     struct timeval tv;
886    
887     tv.tv_sec = req->nv1;
888     tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.;
889    
890     req->result = select (0, 0, 0, 0, &tv);
891     }
892     #endif
893     break;
894    
895     case EIO_UTIME:
896     case EIO_FUTIME:
897     {
898     struct timeval tv[2];
899     struct timeval *times;
900    
901     if (req->nv1 != -1. || req->nv2 != -1.)
902     {
903     tv[0].tv_sec = req->nv1;
904     tv[0].tv_usec = (req->nv1 - tv[0].tv_sec) * 1000000.;
905     tv[1].tv_sec = req->nv2;
906     tv[1].tv_usec = (req->nv2 - tv[1].tv_sec) * 1000000.;
907    
908     times = tv;
909     }
910     else
911     times = 0;
912    
913    
914     req->result = req->type == EIO_FUTIME
915     ? futimes (req->int1, times)
916     : utimes (req->ptr1, times);
917     }
918    
919     case EIO_GROUP:
920     case EIO_NOP:
921     break;
922    
923     case EIO_QUIT:
924     goto quit;
925    
926     default:
927     req->result = -1;
928     break;
929     }
930    
931     req->errorno = errno;
932    
933     X_LOCK (reslock);
934    
935     ++npending;
936    
937 root 1.2 if (!reqq_push (&res_queue, req) && want_poll_cb)
938 root 1.1 want_poll_cb ();
939    
940     self->req = 0;
941     worker_clear (self);
942    
943     X_UNLOCK (reslock);
944     }
945    
946     quit:
947     X_LOCK (wrklock);
948     worker_free (self);
949     X_UNLOCK (wrklock);
950    
951     return 0;
952     }
953    
954     /*****************************************************************************/
955    
956 root 1.2 static void eio_atfork_prepare (void)
957 root 1.1 {
958     X_LOCK (wrklock);
959     X_LOCK (reqlock);
960     X_LOCK (reslock);
961     #if !HAVE_PREADWRITE
962     X_LOCK (preadwritelock);
963     #endif
964     #if !HAVE_READDIR_R
965     X_LOCK (readdirlock);
966     #endif
967     }
968    
969 root 1.2 static void eio_atfork_parent (void)
970 root 1.1 {
971     #if !HAVE_READDIR_R
972     X_UNLOCK (readdirlock);
973     #endif
974     #if !HAVE_PREADWRITE
975     X_UNLOCK (preadwritelock);
976     #endif
977     X_UNLOCK (reslock);
978     X_UNLOCK (reqlock);
979     X_UNLOCK (wrklock);
980     }
981    
982 root 1.2 static void eio_atfork_child (void)
983 root 1.1 {
984     eio_req *prv;
985    
986     while (prv = reqq_shift (&req_queue))
987     eio_destroy (prv);
988    
989     while (prv = reqq_shift (&res_queue))
990     eio_destroy (prv);
991    
992     while (wrk_first.next != &wrk_first)
993     {
994     worker *wrk = wrk_first.next;
995    
996     if (wrk->req)
997     eio_destroy (wrk->req);
998    
999     worker_clear (wrk);
1000     worker_free (wrk);
1001     }
1002    
1003     started = 0;
1004     idle = 0;
1005     nreqs = 0;
1006     nready = 0;
1007     npending = 0;
1008    
1009 root 1.2 eio_atfork_parent ();
1010 root 1.1 }
1011    
1012     int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1013     {
1014     want_poll_cb = want_poll;
1015     done_poll_cb = done_poll;
1016    
1017     #ifdef _WIN32
1018     X_MUTEX_CHECK (wrklock);
1019     X_MUTEX_CHECK (reslock);
1020     X_MUTEX_CHECK (reqlock);
1021     X_MUTEX_CHECK (reqwait);
1022     X_MUTEX_CHECK (preadwritelock);
1023     X_MUTEX_CHECK (readdirlock);
1024    
1025     X_COND_CHECK (reqwait);
1026     #endif
1027    
1028 root 1.2 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1029 root 1.1 }
1030    
1031 root 1.6 static void eio_api_destroy (eio_req *req)
1032     {
1033     free (req);
1034     }
1035    
1036     #define REQ(rtype) \
1037     eio_req *req; \
1038     \
1039     req = (eio_req *)calloc (1, sizeof *req); \
1040     if (!req) \
1041     return 0; \
1042     \
1043     req->type = EIO_ ## rtype; \
1044     req->pri = EIO_DEFAULT_PRI + EIO_PRI_BIAS; \
1045     req->finish = cb; \
1046     req->destroy = eio_api_destroy;
1047    
1048     #define SEND eio_submit (req); return req
1049    
1050     #define PATH (void)0
1051    
1052     eio_req *eio_fsync (int fd, eio_cb cb)
1053     {
1054     REQ (FSYNC); req->int1 = fd; SEND;
1055     }
1056    
1057     eio_req *eio_fdatasync (int fd, eio_cb cb)
1058     {
1059     REQ (FDATASYNC); req->int1 = fd; SEND;
1060     }
1061    
1062     eio_req *eio_readahead (int fd, off_t offset, size_t length, eio_cb cb)
1063     {
1064     REQ (READAHEAD); req->int1 = fd; req->offs = offset; req->size = length; SEND;
1065     }
1066    
1067     eio_req *eio_read (int fd, off_t offset, size_t length, void *data, eio_cb cb)
1068     {
1069     REQ (READ); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = data; SEND;
1070     }
1071    
1072     eio_req *eio_write (int fd, off_t offset, size_t length, void *data, eio_cb cb)
1073     {
1074     REQ (WRITE); req->int1 = fd; req->offs = offset; req->size = length; req->ptr2 = data; SEND;
1075     }
1076    
1077     eio_req *eio_fstat (int fd, eio_cb cb)
1078     {
1079     REQ (FSTAT); req->int1 = fd; SEND;
1080     }
1081    
1082     eio_req *eio_futime (int fd, double atime, double mtime, eio_cb cb)
1083     {
1084     REQ (FUTIME); req->int1 = fd; req->nv1 = atime; req->nv2 = mtime; SEND;
1085     }
1086    
1087     eio_req *eio_ftruncate (int fd, off_t offset, eio_cb cb)
1088     {
1089     REQ (FTRUNCATE); req->int1 = fd; req->offs = offset; SEND;
1090     }
1091    
1092     eio_req *eio_fchmod (int fd, mode_t mode, eio_cb cb)
1093     {
1094     REQ (FCHMOD); req->int1 = fd; req->int2 = (long)mode; SEND;
1095     }
1096    
1097     eio_req *eio_fchown (int fd, uid_t uid, gid_t gid, eio_cb cb)
1098     {
1099     REQ (FCHOWN); req->int1 = fd; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1100     }
1101    
1102     eio_req *eio_dup2 (int fd, int fd2, eio_cb cb)
1103     {
1104     REQ (DUP2); req->int1 = fd; req->int2 = fd2; SEND;
1105     }
1106    
1107     eio_req *eio_sendfile (int out_fd, int in_fd, off_t in_offset, size_t length, eio_cb cb)
1108     {
1109     REQ (SENDFILE); req->int1 = out_fd; req->int2 = in_fd; req->offs = in_offset; req->size = length; SEND;
1110     }
1111    
1112     eio_req *eio_open (const char *path, int flags, mode_t mode, eio_cb cb)
1113     {
1114     REQ (OPEN); PATH; req->int1 = flags; req->int2 = (long)mode; SEND;
1115     }
1116    
1117     eio_req *eio_readlink (const char *path, eio_cb cb)
1118     {
1119     REQ (READLINK); PATH; SEND;
1120     }
1121    
1122     eio_req *eio_stat (const char *path, eio_cb cb)
1123     {
1124     REQ (STAT); PATH; SEND;
1125     }
1126    
1127     eio_req *eio_lstat (const char *path, eio_cb cb)
1128     {
1129     REQ (LSTAT); PATH; SEND;
1130     }
1131    
1132     eio_req *eio_utime (const char *path, double atime, double mtime, eio_cb cb)
1133     {
1134     REQ (UTIME); PATH; req->nv1 = atime; req->nv2 = mtime; SEND;
1135     }
1136    
1137     eio_req *eio_truncate (const char *path, off_t offset, eio_cb cb)
1138     {
1139     REQ (TRUNCATE); PATH; req->offs = offset; SEND;
1140     }
1141    
1142     eio_req *eio_chown (const char *path, uid_t uid, gid_t gid, eio_cb cb)
1143     {
1144     REQ (CHOWN); PATH; req->int2 = (long)uid; req->int3 = (long)gid; SEND;
1145     }
1146    
1147     eio_req *eio_chmod (const char *path, mode_t mode, eio_cb cb)
1148     {
1149     REQ (CHMOD); PATH; req->int2 = (long)mode; SEND;
1150     }
1151    
1152     eio_req *eio_mkdir (const char *path, mode_t mode, eio_cb cb)
1153     {
1154     REQ (MKDIR); PATH; req->int2 = (long)mode; SEND;
1155     }
1156    
1157     eio_req *eio_unlink (const char *path, eio_cb cb)
1158     {
1159     REQ (UNLINK); PATH; SEND;
1160     }
1161    
1162     eio_req *eio_rmdir (const char *path, eio_cb cb)
1163     {
1164     REQ (RMDIR); PATH; SEND;
1165     }
1166    
1167     eio_req *eio_readdir (const char *path, eio_cb cb)
1168     {
1169     REQ (READDIR); PATH; SEND;
1170     }
1171    
1172     eio_req *eio_mknod (const char *path, mode_t mode, dev_t dev, eio_cb cb)
1173     {
1174     REQ (MKNOD); PATH; req->int2 = (long)mode; req->int2 = (long)dev; SEND;
1175     }
1176    
1177     eio_req *eio_busy (double delay, eio_cb cb)
1178     {
1179     REQ (BUSY); req->nv1 = delay; SEND;
1180     }
1181    
1182     eio_req *eio_nop (eio_cb cb)
1183     {
1184     REQ (NOP); SEND;
1185     }
1186    
1187     #undef REQ
1188     #undef PATH
1189     #undef SEND
1190    
1191 root 1.1 #if 0
1192    
1193     void
1194     aio_open (SV8 *pathname, int flags, int mode, SV *callback=&PL_sv_undef)
1195     PROTOTYPE: $$$;$
1196     PPCODE:
1197     {
1198     dREQ;
1199    
1200     req->type = EIO_OPEN;
1201     req->sv1 = newSVsv (pathname);
1202     req->ptr1 = SvPVbyte_nolen (req->sv1);
1203     req->int1 = flags;
1204     req->int2 = mode;
1205    
1206     EIO_SEND;
1207     }
1208    
1209     void
1210     aio_fsync (SV *fh, SV *callback=&PL_sv_undef)
1211     PROTOTYPE: $;$
1212     ALIAS:
1213     aio_fsync = EIO_FSYNC
1214     aio_fdatasync = EIO_FDATASYNC
1215     PPCODE:
1216     {
1217     dREQ;
1218    
1219     req->type = ix;
1220     req->sv1 = newSVsv (fh);
1221     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1222    
1223     EIO_SEND (req);
1224     }
1225    
1226     void
1227     aio_close (SV *fh, SV *callback=&PL_sv_undef)
1228     PROTOTYPE: $;$
1229     PPCODE:
1230     {
1231     dREQ;
1232    
1233     req->type = EIO_CLOSE;
1234     req->sv1 = newSVsv (fh);
1235     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1236    
1237     EIO_SEND (req);
1238     }
1239    
1240     void
1241     aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef)
1242     ALIAS:
1243     aio_read = EIO_READ
1244     aio_write = EIO_WRITE
1245     PROTOTYPE: $$$$$;$
1246     PPCODE:
1247     {
1248     STRLEN svlen;
1249     char *svptr = SvPVbyte (data, svlen);
1250     UV len = SvUV (length);
1251    
1252     SvUPGRADE (data, SVt_PV);
1253     SvPOK_on (data);
1254    
1255     if (dataoffset < 0)
1256     dataoffset += svlen;
1257    
1258     if (dataoffset < 0 || dataoffset > svlen)
1259     croak ("dataoffset outside of data scalar");
1260    
1261     if (ix == EIO_WRITE)
1262     {
1263     /* write: check length and adjust. */
1264     if (!SvOK (length) || len + dataoffset > svlen)
1265     len = svlen - dataoffset;
1266     }
1267     else
1268     {
1269     /* read: grow scalar as necessary */
1270     svptr = SvGROW (data, len + dataoffset + 1);
1271     }
1272    
1273     if (len < 0)
1274     croak ("length must not be negative");
1275    
1276     {
1277     dREQ;
1278    
1279     req->type = ix;
1280     req->sv1 = newSVsv (fh);
1281     req->int1 = PerlIO_fileno (ix == EIO_READ ? IoIFP (sv_2io (fh))
1282     : IoOFP (sv_2io (fh)));
1283     req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1284     req->size = len;
1285     req->sv2 = SvREFCNT_inc (data);
1286     req->ptr2 = (char *)svptr + dataoffset;
1287     req->stroffset = dataoffset;
1288    
1289     if (!SvREADONLY (data))
1290     {
1291     SvREADONLY_on (data);
1292     req->flags |= FLAG_SV2_RO_OFF;
1293     }
1294    
1295     EIO_SEND;
1296     }
1297     }
1298    
1299     void
1300     aio_readlink (SV8 *path, SV *callback=&PL_sv_undef)
1301     PROTOTYPE: $$;$
1302     PPCODE:
1303     {
1304     SV *data;
1305     dREQ;
1306    
1307     data = newSV (NAME_MAX);
1308     SvPOK_on (data);
1309    
1310     req->type = EIO_READLINK;
1311     req->sv1 = newSVsv (path);
1312     req->ptr1 = SvPVbyte_nolen (req->sv1);
1313     req->sv2 = data;
1314     req->ptr2 = SvPVbyte_nolen (data);
1315    
1316     EIO_SEND;
1317     }
1318    
1319     void
1320     aio_sendfile (SV *out_fh, SV *in_fh, SV *in_offset, UV length, SV *callback=&PL_sv_undef)
1321     PROTOTYPE: $$$$;$
1322     PPCODE:
1323     {
1324     dREQ;
1325    
1326     req->type = EIO_SENDFILE;
1327     req->sv1 = newSVsv (out_fh);
1328     req->int1 = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1329     req->sv2 = newSVsv (in_fh);
1330     req->int2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1331     req->offs = SvVAL64 (in_offset);
1332     req->size = length;
1333    
1334     EIO_SEND;
1335     }
1336    
1337     void
1338     aio_readahead (SV *fh, SV *offset, IV length, SV *callback=&PL_sv_undef)
1339     PROTOTYPE: $$$;$
1340     PPCODE:
1341     {
1342     dREQ;
1343    
1344     req->type = EIO_READAHEAD;
1345     req->sv1 = newSVsv (fh);
1346     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh)));
1347     req->offs = SvVAL64 (offset);
1348     req->size = length;
1349    
1350     EIO_SEND;
1351     }
1352    
1353     void
1354     aio_stat (SV8 *fh_or_path, SV *callback=&PL_sv_undef)
1355     ALIAS:
1356     aio_stat = EIO_STAT
1357     aio_lstat = EIO_LSTAT
1358     PPCODE:
1359     {
1360     dREQ;
1361    
1362     req->ptr2 = malloc (sizeof (EIO_STRUCT_STAT));
1363     if (!req->ptr2)
1364     {
1365     req_destroy (req);
1366     croak ("out of memory during aio_stat statdata allocation");
1367     }
1368    
1369     req->flags |= FLAG_PTR2_FREE;
1370     req->sv1 = newSVsv (fh_or_path);
1371    
1372     if (SvPOK (fh_or_path))
1373     {
1374     req->type = ix;
1375     req->ptr1 = SvPVbyte_nolen (req->sv1);
1376     }
1377     else
1378     {
1379     req->type = EIO_FSTAT;
1380     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1381     }
1382    
1383     EIO_SEND;
1384     }
1385    
1386     void
1387     aio_utime (SV8 *fh_or_path, SV *atime, SV *mtime, SV *callback=&PL_sv_undef)
1388     PPCODE:
1389     {
1390     dREQ;
1391    
1392     req->nv1 = SvOK (atime) ? SvNV (atime) : -1.;
1393     req->nv2 = SvOK (mtime) ? SvNV (mtime) : -1.;
1394     req->sv1 = newSVsv (fh_or_path);
1395    
1396     if (SvPOK (fh_or_path))
1397     {
1398     req->type = EIO_UTIME;
1399     req->ptr1 = SvPVbyte_nolen (req->sv1);
1400     }
1401     else
1402     {
1403     req->type = EIO_FUTIME;
1404     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1405     }
1406    
1407     EIO_SEND;
1408     }
1409    
1410     void
1411     aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef)
1412     PPCODE:
1413     {
1414     dREQ;
1415    
1416     req->sv1 = newSVsv (fh_or_path);
1417     req->offs = SvOK (offset) ? SvVAL64 (offset) : -1;
1418    
1419     if (SvPOK (fh_or_path))
1420     {
1421     req->type = EIO_TRUNCATE;
1422     req->ptr1 = SvPVbyte_nolen (req->sv1);
1423     }
1424     else
1425     {
1426     req->type = EIO_FTRUNCATE;
1427     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1428     }
1429    
1430     EIO_SEND;
1431     }
1432    
1433     void
1434     aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef)
1435     ALIAS:
1436     aio_chmod = EIO_CHMOD
1437     aio_fchmod = EIO_FCHMOD
1438     aio_mkdir = EIO_MKDIR
1439     PPCODE:
1440     {
1441     dREQ;
1442    
1443     req->type = type;
1444     req->int2 = mode;
1445     req->sv1 = newSVsv (fh_or_path);
1446    
1447     if (ix == EIO_FCHMOD)
1448     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1449     else
1450     req->ptr1 = SvPVbyte_nolen (req->sv1);
1451    
1452     EIO_SEND;
1453     }
1454    
1455     void
1456     aio_chown (SV8 *fh_or_path, SV *uid, SV *gid, SV *callback=&PL_sv_undef)
1457     PPCODE:
1458     {
1459     dREQ;
1460    
1461     req->int2 = SvOK (uid) ? SvIV (uid) : -1;
1462     req->int3 = SvOK (gid) ? SvIV (gid) : -1;
1463     req->sv1 = newSVsv (fh_or_path);
1464    
1465     if (SvPOK (fh_or_path))
1466     {
1467     req->type = EIO_CHOWN;
1468     req->ptr1 = SvPVbyte_nolen (req->sv1);
1469     }
1470     else
1471     {
1472     req->type = EIO_FCHOWN;
1473     req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1474     }
1475    
1476     EIO_SEND;
1477     }
1478    
1479     void
1480     aio_unlink (SV8 *pathname, SV *callback=&PL_sv_undef)
1481     ALIAS:
1482     aio_unlink = EIO_UNLINK
1483     aio_rmdir = EIO_RMDIR
1484     aio_readdir = EIO_READDIR
1485     PPCODE:
1486     {
1487     dREQ;
1488    
1489     req->type = ix;
1490     req->sv1 = newSVsv (pathname);
1491     req->ptr1 = SvPVbyte_nolen (req->sv1);
1492    
1493     EIO_SEND;
1494     }
1495    
1496     void
1497     aio_link (SV8 *oldpath, SV8 *newpath, SV *callback=&PL_sv_undef)
1498     ALIAS:
1499     aio_link = EIO_LINK
1500     aio_symlink = EIO_SYMLINK
1501     aio_rename = EIO_RENAME
1502     PPCODE:
1503     {
1504     dREQ;
1505    
1506     req->type = ix;
1507     req->sv1 = newSVsv (oldpath);
1508     req->ptr1 = SvPVbyte_nolen (req->sv1);
1509     req->sv2 = newSVsv (newpath);
1510     req->ptr2 = SvPVbyte_nolen (req->sv2);
1511    
1512     EIO_SEND;
1513     }
1514    
1515     void
1516     aio_mknod (SV8 *pathname, int mode, UV dev, SV *callback=&PL_sv_undef)
1517     PPCODE:
1518     {
1519     dREQ;
1520    
1521     req->type = EIO_MKNOD;
1522     req->sv1 = newSVsv (pathname);
1523     req->ptr1 = SvPVbyte_nolen (req->sv1);
1524     req->int2 = (mode_t)mode;
1525     req->offs = dev;
1526    
1527     EIO_SEND;
1528     }
1529    
1530     void
1531     aio_busy (double delay, SV *callback=&PL_sv_undef)
1532     PPCODE:
1533     {
1534     dREQ;
1535    
1536     req->type = EIO_BUSY;
1537     req->nv1 = delay < 0. ? 0. : delay;
1538    
1539     EIO_SEND;
1540     }
1541    
1542     void
1543     aio_group (SV *callback=&PL_sv_undef)
1544     PROTOTYPE: ;$
1545     PPCODE:
1546     {
1547     dREQ;
1548    
1549     req->type = EIO_GROUP;
1550    
1551     req_send (req);
1552     XPUSHs (req_sv (req, AIO_GRP_KLASS));
1553     }
1554    
1555     void
1556     aio_nop (SV *callback=&PL_sv_undef)
1557     ALIAS:
1558     aio_nop = EIO_NOP
1559     aio_sync = EIO_SYNC
1560     PPCODE:
1561     {
1562     dREQ;
1563    
1564     #endif
1565    
1566 root 1.2 void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit)
1567 root 1.1 {
1568     grp->int2 = limit;
1569     grp->feed = feed;
1570 root 1.2
1571     grp_try_feed (grp);
1572     }
1573    
1574     void eio_grp_limit (eio_req *grp, int limit)
1575     {
1576     grp->int2 = limit;
1577    
1578     grp_try_feed (grp);
1579 root 1.1 }
1580    
1581     void eio_grp_add (eio_req *grp, eio_req *req)
1582     {
1583     assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2));
1584    
1585     ++grp->size;
1586     req->grp = grp;
1587    
1588     req->grp_prev = 0;
1589     req->grp_next = grp->grp_first;
1590    
1591     if (grp->grp_first)
1592     grp->grp_first->grp_prev = req;
1593    
1594     grp->grp_first = req;
1595     }
1596    
1597