ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.43
Committed: Sat Oct 21 23:06:04 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.42: +145 -66 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.1 #include <sys/types.h>
15     #include <sys/stat.h>
16 root 1.37 #include <limits.h>
17 root 1.1 #include <unistd.h>
18     #include <fcntl.h>
19     #include <signal.h>
20     #include <sched.h>
21    
22 root 1.32 #if HAVE_SENDFILE
23     # if __linux
24     # include <sys/sendfile.h>
25     # elif __freebsd
26     # include <sys/socket.h>
27     # include <sys/uio.h>
28     # elif __hpux
29     # include <sys/socket.h>
30 root 1.35 # elif __solaris /* not yet */
31     # include <sys/sendfile.h>
32 root 1.34 # else
33     # error sendfile support requested but not available
34 root 1.32 # endif
35     #endif
36 root 1.1
37 root 1.39 /* used for struct dirent, AIX doesn't provide it */
38     #ifndef NAME_MAX
39     # define NAME_MAX 4096
40     #endif
41    
42 root 1.4 #if __ia64
43     # define STACKSIZE 65536
44 root 1.1 #else
45 root 1.37 # define STACKSIZE 8192
46 root 1.1 #endif
47    
48     enum {
49     REQ_QUIT,
50     REQ_OPEN, REQ_CLOSE,
51     REQ_READ, REQ_WRITE, REQ_READAHEAD,
52 root 1.32 REQ_SENDFILE,
53 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
54 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
55 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
56 root 1.37 REQ_READDIR,
57 root 1.40 REQ_LINK, REQ_SYMLINK,
58 root 1.1 };
59    
60 root 1.43 #define AIO_CB_KLASS "IO::AIO::CB"
61    
62     typedef struct aio_cb
63     {
64     struct aio_cb *grp_prev, *grp_next;
65     struct aio_grp *grp;
66    
67 root 1.3 struct aio_cb *volatile next;
68 root 1.1
69 root 1.43 SV *self; /* the perl counterpart of this request, if any */
70 root 1.1
71 root 1.43 SV *data, *callback;
72     SV *fh, *fh2;
73     void *dataptr, *data2ptr;
74     Stat_t *statdata;
75 root 1.1 off_t offset;
76     size_t length;
77     ssize_t result;
78 root 1.43
79     int type;
80     int fd, fd2;
81 root 1.1 int errorno;
82     STRLEN dataoffset;
83 root 1.43 mode_t mode; /* open */
84     unsigned char cancelled;
85 root 1.1 } aio_cb;
86    
87     typedef aio_cb *aio_req;
88 root 1.43 typedef aio_cb *aio_req_ornot;
89 root 1.1
90 root 1.30 static int started, wanted;
91 root 1.11 static volatile int nreqs;
92 root 1.4 static int max_outstanding = 1<<30;
93 root 1.3 static int respipe [2];
94 root 1.1
95 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
96     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
97     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
98    
99     static volatile aio_req reqs, reqe; /* queue start, queue end */
100     static volatile aio_req ress, rese; /* queue start, queue end */
101 root 1.1
102 root 1.43 typedef struct aio_grp
103     {
104     struct aio_cb *first, *last;
105     SV *callback;
106     int busycount;
107     } aio_grp;
108    
109     static void aio_grp_begin (aio_grp *grp)
110     {
111     ++grp->busycount;
112     }
113    
114     static void aio_grp_end (aio_grp *grp)
115     {
116     --grp->busycount;
117    
118     if (grp->busycount)
119     return;
120    
121     SvREFCNT_dec (grp->callback);
122     grp->callback = 0;
123     }
124    
125     static aio_grp *aio_grp_new ()
126     {
127     aio_grp *grp;
128    
129     Newz (0, grp, 1, aio_grp);
130     aio_grp_begin (grp);
131    
132     return grp;
133     }
134    
135     /* must be called at most once */
136     static SV *req_sv (aio_req req)
137     {
138     req->self = (SV *)newHV ();
139     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
140    
141     return sv_bless (newRV_noinc (req->self), gv_stashpv (AIO_CB_KLASS, 1));
142     }
143    
144     static aio_req SvAIO_REQ (SV *sv)
145 root 1.26 {
146 root 1.43 if (!sv_derived_from (sv, AIO_CB_KLASS) || !SvROK (sv))
147     croak ("object of class " AIO_CB_KLASS " expected");
148    
149     MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
150    
151     return mg ? (aio_req)mg->mg_ptr : 0;
152     }
153    
154     static void req_free (aio_req req)
155     {
156     if (req->self)
157     {
158     sv_unmagic (req->self, PERL_MAGIC_ext);
159     SvREFCNT_dec (req->self);
160     }
161    
162 root 1.26 if (req->data)
163     SvREFCNT_dec (req->data);
164    
165     if (req->fh)
166     SvREFCNT_dec (req->fh);
167    
168 root 1.32 if (req->fh2)
169     SvREFCNT_dec (req->fh2);
170    
171 root 1.27 if (req->statdata)
172 root 1.26 Safefree (req->statdata);
173    
174     if (req->callback)
175     SvREFCNT_dec (req->callback);
176    
177 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
178     free (req->data2ptr);
179    
180 root 1.26 Safefree (req);
181     }
182    
183 root 1.1 static void
184     poll_wait ()
185     {
186 root 1.11 if (nreqs && !ress)
187     {
188     fd_set rfd;
189     FD_ZERO(&rfd);
190     FD_SET(respipe [0], &rfd);
191 root 1.3
192 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
193     }
194 root 1.1 }
195    
196     static int
197 root 1.5 poll_cb ()
198 root 1.1 {
199     dSP;
200     int count = 0;
201 root 1.24 int do_croak = 0;
202 root 1.27 aio_req req;
203    
204     for (;;)
205     {
206     pthread_mutex_lock (&reslock);
207     req = ress;
208    
209     if (req)
210     {
211     ress = req->next;
212 root 1.11
213 root 1.27 if (!ress)
214     {
215     /* read any signals sent by the worker threads */
216     char buf [32];
217     while (read (respipe [0], buf, 32) == 32)
218     ;
219 root 1.30
220     rese = 0;
221 root 1.27 }
222     }
223 root 1.1
224 root 1.27 pthread_mutex_unlock (&reslock);
225 root 1.3
226 root 1.27 if (!req)
227     break;
228 root 1.3
229 root 1.1 nreqs--;
230    
231     if (req->type == REQ_QUIT)
232     started--;
233     else
234     {
235     int errorno = errno;
236     errno = req->errorno;
237    
238     if (req->type == REQ_READ)
239 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
240 root 1.1
241 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
242     SvREADONLY_off (req->data);
243    
244 root 1.27 if (req->statdata)
245 root 1.1 {
246     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
247     PL_laststatval = req->result;
248     PL_statcache = *(req->statdata);
249     }
250    
251 root 1.13 ENTER;
252 root 1.1 PUSHMARK (SP);
253 root 1.2
254 root 1.37 if (req->type == REQ_READDIR)
255     {
256     SV *rv = &PL_sv_undef;
257    
258     if (req->result >= 0)
259     {
260     char *buf = req->data2ptr;
261     AV *av = newAV ();
262    
263     while (req->result)
264     {
265     SV *sv = newSVpv (buf, 0);
266    
267     av_push (av, sv);
268     buf += SvCUR (sv) + 1;
269     req->result--;
270     }
271    
272     rv = sv_2mortal (newRV_noinc ((SV *)av));
273     }
274    
275     XPUSHs (rv);
276     }
277     else
278 root 1.2 {
279 root 1.37 XPUSHs (sv_2mortal (newSViv (req->result)));
280    
281     if (req->type == REQ_OPEN)
282     {
283     /* convert fd to fh */
284     SV *fh;
285 root 1.2
286 root 1.37 PUTBACK;
287     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
288     SPAGAIN;
289 root 1.2
290 root 1.37 fh = SvREFCNT_inc (POPs);
291 root 1.2
292 root 1.37 PUSHMARK (SP);
293     XPUSHs (sv_2mortal (fh));
294     }
295 root 1.2 }
296    
297 root 1.43 if (SvOK (req->callback) && !req->cancelled)
298 root 1.8 {
299     PUTBACK;
300     call_sv (req->callback, G_VOID | G_EVAL);
301     SPAGAIN;
302 root 1.27
303     if (SvTRUE (ERRSV))
304     {
305 root 1.43 req_free (req);
306 root 1.27 croak (0);
307     }
308 root 1.8 }
309 root 1.13
310 root 1.26 LEAVE;
311    
312 root 1.1 errno = errorno;
313     count++;
314     }
315    
316 root 1.43 req_free (req);
317 root 1.1 }
318    
319     return count;
320     }
321    
322 root 1.4 static void *aio_proc(void *arg);
323    
324     static void
325     start_thread (void)
326     {
327     sigset_t fullsigset, oldsigset;
328     pthread_t tid;
329     pthread_attr_t attr;
330    
331     pthread_attr_init (&attr);
332     pthread_attr_setstacksize (&attr, STACKSIZE);
333 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
334 root 1.4
335     sigfillset (&fullsigset);
336     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
337    
338     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
339     started++;
340    
341     sigprocmask (SIG_SETMASK, &oldsigset, 0);
342     }
343    
344     static void
345 root 1.43 req_send (aio_req req)
346 root 1.4 {
347 root 1.30 while (started < wanted && nreqs >= started)
348     start_thread ();
349    
350 root 1.4 nreqs++;
351    
352     pthread_mutex_lock (&reqlock);
353    
354     req->next = 0;
355    
356     if (reqe)
357     {
358     reqe->next = req;
359     reqe = req;
360     }
361     else
362     reqe = reqs = req;
363    
364     pthread_cond_signal (&reqwait);
365     pthread_mutex_unlock (&reqlock);
366    
367 root 1.27 if (nreqs > max_outstanding)
368     for (;;)
369     {
370     poll_cb ();
371    
372     if (nreqs <= max_outstanding)
373     break;
374    
375     poll_wait ();
376     }
377 root 1.4 }
378    
379     static void
380     end_thread (void)
381     {
382     aio_req req;
383 root 1.26 Newz (0, req, 1, aio_cb);
384 root 1.4 req->type = REQ_QUIT;
385    
386 root 1.43 req_send (req);
387 root 1.4 }
388    
389 root 1.22 static void min_parallel (int nthreads)
390     {
391 root 1.30 if (wanted < nthreads)
392     wanted = nthreads;
393 root 1.22 }
394    
395     static void max_parallel (int nthreads)
396     {
397     int cur = started;
398 root 1.27
399 root 1.30 if (wanted > nthreads)
400     wanted = nthreads;
401    
402     while (cur > wanted)
403     {
404 root 1.22 end_thread ();
405     cur--;
406     }
407    
408 root 1.30 while (started > wanted)
409 root 1.22 {
410     poll_wait ();
411     poll_cb ();
412     }
413     }
414    
415 root 1.26 static void create_pipe ()
416 root 1.22 {
417 root 1.26 if (pipe (respipe))
418     croak ("unable to initialize result pipe");
419 root 1.22
420 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
421     croak ("cannot set result pipe to nonblocking mode");
422 root 1.22
423 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
424     croak ("cannot set result pipe to nonblocking mode");
425     }
426 root 1.22
427     /*****************************************************************************/
428 root 1.17 /* work around various missing functions */
429    
430     #if !HAVE_PREADWRITE
431     # define pread aio_pread
432     # define pwrite aio_pwrite
433    
434     /*
435     * make our pread/pwrite safe against themselves, but not against
436     * normal read/write by using a mutex. slows down execution a lot,
437     * but that's your problem, not mine.
438     */
439 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
440 root 1.17
441     static ssize_t
442     pread (int fd, void *buf, size_t count, off_t offset)
443     {
444     ssize_t res;
445     off_t ooffset;
446    
447 root 1.37 pthread_mutex_lock (&preadwritelock);
448 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
449     lseek (fd, offset, SEEK_SET);
450     res = read (fd, buf, count);
451     lseek (fd, ooffset, SEEK_SET);
452 root 1.37 pthread_mutex_unlock (&preadwritelock);
453 root 1.17
454     return res;
455     }
456    
457     static ssize_t
458     pwrite (int fd, void *buf, size_t count, off_t offset)
459     {
460     ssize_t res;
461     off_t ooffset;
462    
463 root 1.37 pthread_mutex_lock (&preadwritelock);
464 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
465     lseek (fd, offset, SEEK_SET);
466     res = write (fd, buf, count);
467     lseek (fd, offset, SEEK_SET);
468 root 1.37 pthread_mutex_unlock (&preadwritelock);
469 root 1.17
470     return res;
471     }
472     #endif
473    
474     #if !HAVE_FDATASYNC
475     # define fdatasync fsync
476     #endif
477    
478     #if !HAVE_READAHEAD
479     # define readahead aio_readahead
480    
481     static ssize_t
482     readahead (int fd, off_t offset, size_t count)
483     {
484 root 1.37 char readahead_buf[4096];
485    
486 root 1.17 while (count > 0)
487     {
488     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
489    
490     pread (fd, readahead_buf, len, offset);
491     offset += len;
492     count -= len;
493     }
494    
495     errno = 0;
496     }
497     #endif
498    
499 root 1.37 #if !HAVE_READDIR_R
500     # define readdir_r aio_readdir_r
501    
502     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
503    
504     static int
505     readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
506     {
507     struct dirent *e;
508     int errorno;
509    
510     pthread_mutex_lock (&readdirlock);
511    
512     e = readdir (dirp);
513     errorno = errno;
514    
515     if (e)
516     {
517     *res = ent;
518     strcpy (ent->d_name, e->d_name);
519     }
520     else
521     *res = 0;
522    
523     pthread_mutex_unlock (&readdirlock);
524    
525     errno = errorno;
526     return e ? 0 : -1;
527     }
528     #endif
529    
530 root 1.32 /* sendfile always needs emulation */
531     static ssize_t
532     sendfile_ (int ofd, int ifd, off_t offset, size_t count)
533     {
534 root 1.37 ssize_t res;
535 root 1.32
536 root 1.37 if (!count)
537     return 0;
538 root 1.32
539 root 1.35 #if HAVE_SENDFILE
540     # if __linux
541 root 1.37 res = sendfile (ofd, ifd, &offset, count);
542 root 1.32
543 root 1.35 # elif __freebsd
544 root 1.37 /*
545     * Of course, the freebsd sendfile is a dire hack with no thoughts
546     * wasted on making it similar to other I/O functions.
547     */
548     {
549     off_t sbytes;
550     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
551    
552     if (res < 0 && sbytes)
553     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
554     res = sbytes;
555     }
556 root 1.32
557 root 1.35 # elif __hpux
558 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
559 root 1.32
560 root 1.35 # elif __solaris
561 root 1.37 {
562     struct sendfilevec vec;
563     size_t sbytes;
564    
565     vec.sfv_fd = ifd;
566     vec.sfv_flag = 0;
567     vec.sfv_off = offset;
568     vec.sfv_len = count;
569    
570     res = sendfilev (ofd, &vec, 1, &sbytes);
571    
572     if (res < 0 && sbytes)
573     res = sbytes;
574     }
575 root 1.35
576 root 1.38 # endif
577     #else
578 root 1.37 res = -1;
579     errno = ENOSYS;
580 root 1.32 #endif
581    
582 root 1.37 if (res < 0
583     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
584 root 1.35 #if __solaris
585 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
586 root 1.35 #endif
587 root 1.37 )
588     )
589     {
590     /* emulate sendfile. this is a major pain in the ass */
591     char buf[4096];
592     res = 0;
593    
594 root 1.38 while (count)
595 root 1.37 {
596     ssize_t cnt;
597    
598 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
599 root 1.32
600 root 1.37 if (cnt <= 0)
601     {
602     if (cnt && !res) res = -1;
603     break;
604     }
605    
606     cnt = write (ofd, buf, cnt);
607    
608     if (cnt <= 0)
609     {
610     if (cnt && !res) res = -1;
611     break;
612     }
613    
614     offset += cnt;
615     res += cnt;
616 root 1.38 count -= cnt;
617 root 1.37 }
618     }
619    
620     return res;
621     }
622    
623     /* read a full directory */
624     static int
625     scandir_ (const char *path, void **namesp)
626     {
627     DIR *dirp = opendir (path);
628     union
629     {
630     struct dirent d;
631     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
632     } u;
633     struct dirent *entp;
634     char *name, *names;
635     int memlen = 4096;
636     int memofs = 0;
637     int res = 0;
638     int errorno;
639    
640     if (!dirp)
641     return -1;
642    
643     names = malloc (memlen);
644    
645     for (;;)
646     {
647     errno = 0, readdir_r (dirp, &u.d, &entp);
648    
649     if (!entp)
650     break;
651    
652     name = entp->d_name;
653    
654     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
655     {
656     int len = strlen (name) + 1;
657    
658     res++;
659    
660     while (memofs + len > memlen)
661     {
662     memlen *= 2;
663     names = realloc (names, memlen);
664     if (!names)
665     break;
666     }
667    
668     memcpy (names + memofs, name, len);
669     memofs += len;
670     }
671     }
672    
673     errorno = errno;
674     closedir (dirp);
675    
676     if (errorno)
677     {
678     free (names);
679     errno = errorno;
680     res = -1;
681     }
682    
683     *namesp = (void *)names;
684     return res;
685 root 1.32 }
686    
687 root 1.22 /*****************************************************************************/
688    
689 root 1.1 static void *
690     aio_proc (void *thr_arg)
691     {
692     aio_req req;
693 root 1.3 int type;
694 root 1.1
695 root 1.3 do
696 root 1.1 {
697 root 1.3 pthread_mutex_lock (&reqlock);
698    
699     for (;;)
700     {
701     req = reqs;
702    
703     if (reqs)
704     {
705     reqs = reqs->next;
706     if (!reqs) reqe = 0;
707     }
708    
709     if (req)
710     break;
711    
712     pthread_cond_wait (&reqwait, &reqlock);
713     }
714    
715     pthread_mutex_unlock (&reqlock);
716    
717 root 1.1 errno = 0; /* strictly unnecessary */
718    
719 root 1.43 if (!req->cancelled)
720     switch (req->type)
721     {
722     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
723     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
724 root 1.3
725 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
726     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
727 root 1.16
728 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
729     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
730     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
731    
732     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
733     case REQ_CLOSE: req->result = close (req->fd); break;
734     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
735     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
736     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
737     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
738     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
739    
740     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
741     case REQ_FSYNC: req->result = fsync (req->fd); break;
742     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
743 root 1.1
744 root 1.43 case REQ_QUIT:
745     break;
746 root 1.1
747 root 1.43 default:
748     req->result = ENOSYS;
749     break;
750     }
751 root 1.1
752     req->errorno = errno;
753 root 1.3
754     pthread_mutex_lock (&reslock);
755    
756     req->next = 0;
757    
758     if (rese)
759     {
760     rese->next = req;
761     rese = req;
762     }
763     else
764     {
765     rese = ress = req;
766    
767     /* write a dummy byte to the pipe so fh becomes ready */
768     write (respipe [1], &respipe, 1);
769     }
770    
771     pthread_mutex_unlock (&reslock);
772 root 1.1 }
773 root 1.3 while (type != REQ_QUIT);
774 root 1.1
775     return 0;
776     }
777    
778 root 1.37 /*****************************************************************************/
779    
780     static void atfork_prepare (void)
781     {
782     pthread_mutex_lock (&reqlock);
783     pthread_mutex_lock (&reslock);
784     #if !HAVE_PREADWRITE
785     pthread_mutex_lock (&preadwritelock);
786     #endif
787     #if !HAVE_READDIR_R
788     pthread_mutex_lock (&readdirlock);
789     #endif
790     }
791    
792     static void atfork_parent (void)
793     {
794     #if !HAVE_READDIR_R
795     pthread_mutex_unlock (&readdirlock);
796     #endif
797     #if !HAVE_PREADWRITE
798     pthread_mutex_unlock (&preadwritelock);
799     #endif
800     pthread_mutex_unlock (&reslock);
801     pthread_mutex_unlock (&reqlock);
802     }
803    
804     static void atfork_child (void)
805     {
806     aio_req prv;
807    
808     started = 0;
809    
810     while (reqs)
811     {
812     prv = reqs;
813     reqs = prv->next;
814 root 1.43 req_free (prv);
815 root 1.37 }
816    
817     reqs = reqe = 0;
818    
819     while (ress)
820     {
821     prv = ress;
822     ress = prv->next;
823 root 1.43 req_free (prv);
824 root 1.37 }
825    
826     ress = rese = 0;
827    
828     close (respipe [0]);
829     close (respipe [1]);
830     create_pipe ();
831    
832     atfork_parent ();
833     }
834    
835 root 1.22 #define dREQ \
836     aio_req req; \
837     \
838     if (SvOK (callback) && !SvROK (callback)) \
839 root 1.43 croak ("callback must be undef or of reference type"); \
840 root 1.22 \
841     Newz (0, req, 1, aio_cb); \
842     if (!req) \
843     croak ("out of memory during aio_req allocation"); \
844     \
845 root 1.43 req->callback = newSVsv (callback)
846    
847     #define REQ_SEND \
848     req_send (req); \
849     \
850     if (GIMME_V != G_VOID) \
851     XPUSHs (req_sv (req));
852 root 1.22
853 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
854    
855 root 1.8 PROTOTYPES: ENABLE
856    
857 root 1.1 BOOT:
858     {
859 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
860     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
861     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
862     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
863    
864 root 1.26 create_pipe ();
865 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
866 root 1.1 }
867    
868     void
869 root 1.40 min_parallel (nthreads)
870 root 1.1 int nthreads
871     PROTOTYPE: $
872    
873     void
874 root 1.40 max_parallel (nthreads)
875 root 1.1 int nthreads
876     PROTOTYPE: $
877    
878 root 1.4 int
879 root 1.40 max_outstanding (nreqs)
880 root 1.4 int nreqs
881     PROTOTYPE: $
882     CODE:
883     RETVAL = max_outstanding;
884     max_outstanding = nreqs;
885    
886 root 1.1 void
887 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
888 root 1.1 SV * pathname
889     int flags
890     int mode
891     SV * callback
892 root 1.8 PROTOTYPE: $$$;$
893 root 1.43 PPCODE:
894 root 1.1 {
895 root 1.22 dREQ;
896 root 1.1
897     req->type = REQ_OPEN;
898     req->data = newSVsv (pathname);
899 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
900 root 1.1 req->fd = flags;
901     req->mode = mode;
902    
903 root 1.43 REQ_SEND;
904 root 1.1 }
905    
906     void
907 root 1.40 aio_close (fh,callback=&PL_sv_undef)
908 root 1.13 SV * fh
909     SV * callback
910 root 1.8 PROTOTYPE: $;$
911 root 1.1 ALIAS:
912     aio_close = REQ_CLOSE
913     aio_fsync = REQ_FSYNC
914     aio_fdatasync = REQ_FDATASYNC
915 root 1.43 PPCODE:
916 root 1.1 {
917 root 1.22 dREQ;
918 root 1.1
919     req->type = ix;
920 root 1.13 req->fh = newSVsv (fh);
921     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
922 root 1.1
923 root 1.43 REQ_SEND (req);
924 root 1.1 }
925    
926     void
927 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
928 root 1.13 SV * fh
929     UV offset
930 root 1.32 UV length
931 root 1.13 SV * data
932 root 1.32 UV dataoffset
933 root 1.13 SV * callback
934     ALIAS:
935     aio_read = REQ_READ
936     aio_write = REQ_WRITE
937 root 1.8 PROTOTYPE: $$$$$;$
938 root 1.43 PPCODE:
939 root 1.13 {
940     aio_req req;
941     STRLEN svlen;
942 root 1.21 char *svptr = SvPVbyte (data, svlen);
943 root 1.13
944     SvUPGRADE (data, SVt_PV);
945     SvPOK_on (data);
946 root 1.1
947 root 1.13 if (dataoffset < 0)
948     dataoffset += svlen;
949    
950     if (dataoffset < 0 || dataoffset > svlen)
951     croak ("data offset outside of string");
952    
953     if (ix == REQ_WRITE)
954     {
955     /* write: check length and adjust. */
956     if (length < 0 || length + dataoffset > svlen)
957     length = svlen - dataoffset;
958     }
959     else
960     {
961     /* read: grow scalar as necessary */
962     svptr = SvGROW (data, length + dataoffset);
963     }
964    
965     if (length < 0)
966     croak ("length must not be negative");
967    
968 root 1.22 {
969     dREQ;
970 root 1.13
971 root 1.22 req->type = ix;
972     req->fh = newSVsv (fh);
973     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
974     : IoOFP (sv_2io (fh)));
975     req->offset = offset;
976     req->length = length;
977     req->data = SvREFCNT_inc (data);
978     req->dataptr = (char *)svptr + dataoffset;
979 root 1.13
980 root 1.28 if (!SvREADONLY (data))
981     {
982     SvREADONLY_on (data);
983     req->data2ptr = (void *)data;
984     }
985    
986 root 1.43 REQ_SEND;
987 root 1.22 }
988 root 1.13 }
989 root 1.1
990     void
991 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
992 root 1.32 SV * out_fh
993     SV * in_fh
994     UV in_offset
995     UV length
996     SV * callback
997     PROTOTYPE: $$$$;$
998 root 1.43 PPCODE:
999 root 1.32 {
1000     dREQ;
1001    
1002     req->type = REQ_SENDFILE;
1003     req->fh = newSVsv (out_fh);
1004     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1005     req->fh2 = newSVsv (in_fh);
1006     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1007     req->offset = in_offset;
1008     req->length = length;
1009    
1010 root 1.43 REQ_SEND;
1011 root 1.32 }
1012    
1013     void
1014 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1015 root 1.13 SV * fh
1016     UV offset
1017     IV length
1018     SV * callback
1019 root 1.8 PROTOTYPE: $$$;$
1020 root 1.43 PPCODE:
1021 root 1.1 {
1022 root 1.22 dREQ;
1023 root 1.1
1024     req->type = REQ_READAHEAD;
1025 root 1.13 req->fh = newSVsv (fh);
1026     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1027 root 1.1 req->offset = offset;
1028     req->length = length;
1029    
1030 root 1.43 REQ_SEND;
1031 root 1.1 }
1032    
1033     void
1034 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1035 root 1.1 SV * fh_or_path
1036     SV * callback
1037     ALIAS:
1038 root 1.8 aio_stat = REQ_STAT
1039     aio_lstat = REQ_LSTAT
1040 root 1.43 PPCODE:
1041 root 1.1 {
1042 root 1.22 dREQ;
1043 root 1.1
1044     New (0, req->statdata, 1, Stat_t);
1045     if (!req->statdata)
1046 root 1.27 {
1047 root 1.43 req_free (req);
1048 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1049     }
1050 root 1.1
1051     if (SvPOK (fh_or_path))
1052     {
1053 root 1.8 req->type = ix;
1054 root 1.1 req->data = newSVsv (fh_or_path);
1055 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1056 root 1.1 }
1057     else
1058     {
1059     req->type = REQ_FSTAT;
1060 root 1.13 req->fh = newSVsv (fh_or_path);
1061 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1062     }
1063    
1064 root 1.43 REQ_SEND;
1065 root 1.1 }
1066    
1067     void
1068 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1069 root 1.1 SV * pathname
1070     SV * callback
1071 root 1.22 ALIAS:
1072 root 1.40 aio_unlink = REQ_UNLINK
1073     aio_rmdir = REQ_RMDIR
1074     aio_readdir = REQ_READDIR
1075 root 1.43 PPCODE:
1076 root 1.1 {
1077 root 1.22 dREQ;
1078 root 1.1
1079 root 1.22 req->type = ix;
1080     req->data = newSVsv (pathname);
1081     req->dataptr = SvPVbyte_nolen (req->data);
1082 root 1.1
1083 root 1.43 REQ_SEND;
1084 root 1.22 }
1085    
1086     void
1087 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1088 root 1.22 SV * oldpath
1089     SV * newpath
1090     SV * callback
1091 root 1.40 ALIAS:
1092     aio_link = REQ_LINK
1093     aio_symlink = REQ_SYMLINK
1094     aio_rename = REQ_RENAME
1095 root 1.43 PPCODE:
1096 root 1.22 {
1097     dREQ;
1098 root 1.1
1099 root 1.40 req->type = ix;
1100 root 1.22 req->fh = newSVsv (oldpath);
1101     req->data2ptr = SvPVbyte_nolen (req->fh);
1102     req->data = newSVsv (newpath);
1103     req->dataptr = SvPVbyte_nolen (req->data);
1104 root 1.1
1105 root 1.43 REQ_SEND;
1106 root 1.1 }
1107    
1108 root 1.42 #if 0
1109    
1110     # undocumented, because it does not cancel active requests
1111     void
1112     cancel_most_requests ()
1113     PROTOTYPE:
1114     CODE:
1115     {
1116     aio_req *req;
1117    
1118     pthread_mutex_lock (&reqlock);
1119     for (req = reqs; req; req = req->next)
1120     req->flags |= 1;
1121     pthread_mutex_unlock (&reqlock);
1122    
1123     pthread_mutex_lock (&reslock);
1124     for (req = ress; req; req = req->next)
1125     req->flags |= 1;
1126     pthread_mutex_unlock (&reslock);
1127     }
1128    
1129     #endif
1130    
1131 root 1.6 void
1132 root 1.40 flush ()
1133 root 1.6 PROTOTYPE:
1134     CODE:
1135     while (nreqs)
1136     {
1137     poll_wait ();
1138     poll_cb ();
1139     }
1140    
1141 root 1.7 void
1142     poll()
1143     PROTOTYPE:
1144     CODE:
1145     if (nreqs)
1146     {
1147     poll_wait ();
1148     poll_cb ();
1149     }
1150    
1151 root 1.1 int
1152     poll_fileno()
1153     PROTOTYPE:
1154     CODE:
1155 root 1.3 RETVAL = respipe [0];
1156 root 1.1 OUTPUT:
1157     RETVAL
1158    
1159     int
1160     poll_cb(...)
1161     PROTOTYPE:
1162     CODE:
1163 root 1.5 RETVAL = poll_cb ();
1164 root 1.1 OUTPUT:
1165     RETVAL
1166    
1167     void
1168     poll_wait()
1169     PROTOTYPE:
1170     CODE:
1171 root 1.3 if (nreqs)
1172     poll_wait ();
1173 root 1.1
1174     int
1175     nreqs()
1176     PROTOTYPE:
1177     CODE:
1178     RETVAL = nreqs;
1179     OUTPUT:
1180     RETVAL
1181    
1182 root 1.43 MODULE = IO::AIO PACKAGE = IO::AIO::CB
1183    
1184     void
1185     cancel (aio_req_ornot req)
1186     PROTOTYPE:
1187     CODE:
1188     req->cancelled = 1;
1189