ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.35
Committed: Tue Aug 23 01:16:50 2005 UTC (18 years, 8 months ago) by root
Branch: MAIN
Changes since 1.34: +48 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.1 #include <sys/types.h>
13     #include <sys/stat.h>
14 root 1.10
15 root 1.1 #include <unistd.h>
16     #include <fcntl.h>
17     #include <signal.h>
18     #include <sched.h>
19    
20 root 1.32 #if HAVE_SENDFILE
21     # if __linux
22     # include <sys/sendfile.h>
23     # elif __freebsd
24     # include <sys/socket.h>
25     # include <sys/uio.h>
26     # elif __hpux
27     # include <sys/socket.h>
28 root 1.35 # elif __solaris /* not yet */
29     # include <sys/sendfile.h>
30 root 1.34 # else
31     # error sendfile support requested but not available
32 root 1.32 # endif
33     #endif
34 root 1.1
35 root 1.4 #if __ia64
36     # define STACKSIZE 65536
37 root 1.1 #else
38 root 1.4 # define STACKSIZE 4096
39 root 1.1 #endif
40    
41     enum {
42     REQ_QUIT,
43     REQ_OPEN, REQ_CLOSE,
44     REQ_READ, REQ_WRITE, REQ_READAHEAD,
45 root 1.32 REQ_SENDFILE,
46 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
47 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
48 root 1.23 REQ_UNLINK, REQ_RMDIR,
49 root 1.22 REQ_SYMLINK,
50 root 1.1 };
51    
52     typedef struct aio_cb {
53 root 1.3 struct aio_cb *volatile next;
54 root 1.1
55     int type;
56    
57 root 1.32 int fd, fd2;
58 root 1.1 off_t offset;
59     size_t length;
60     ssize_t result;
61     mode_t mode; /* open */
62     int errorno;
63 root 1.32 SV *data, *callback;
64     SV *fh, *fh2;
65 root 1.22 void *dataptr, *data2ptr;
66 root 1.1 STRLEN dataoffset;
67    
68     Stat_t *statdata;
69     } aio_cb;
70    
71     typedef aio_cb *aio_req;
72    
73 root 1.30 static int started, wanted;
74 root 1.11 static volatile int nreqs;
75 root 1.4 static int max_outstanding = 1<<30;
76 root 1.3 static int respipe [2];
77 root 1.1
78 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
79     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
80     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
81    
82     static volatile aio_req reqs, reqe; /* queue start, queue end */
83     static volatile aio_req ress, rese; /* queue start, queue end */
84 root 1.1
85 root 1.26 static void free_req (aio_req req)
86     {
87     if (req->data)
88     SvREFCNT_dec (req->data);
89    
90     if (req->fh)
91     SvREFCNT_dec (req->fh);
92    
93 root 1.32 if (req->fh2)
94     SvREFCNT_dec (req->fh2);
95    
96 root 1.27 if (req->statdata)
97 root 1.26 Safefree (req->statdata);
98    
99     if (req->callback)
100     SvREFCNT_dec (req->callback);
101    
102     Safefree (req);
103     }
104    
105 root 1.1 static void
106     poll_wait ()
107     {
108 root 1.11 if (nreqs && !ress)
109     {
110     fd_set rfd;
111     FD_ZERO(&rfd);
112     FD_SET(respipe [0], &rfd);
113 root 1.3
114 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
115     }
116 root 1.1 }
117    
118     static int
119 root 1.5 poll_cb ()
120 root 1.1 {
121     dSP;
122     int count = 0;
123 root 1.24 int do_croak = 0;
124 root 1.27 aio_req req;
125    
126     for (;;)
127     {
128     pthread_mutex_lock (&reslock);
129     req = ress;
130    
131     if (req)
132     {
133     ress = req->next;
134 root 1.11
135 root 1.27 if (!ress)
136     {
137     /* read any signals sent by the worker threads */
138     char buf [32];
139     while (read (respipe [0], buf, 32) == 32)
140     ;
141 root 1.30
142     rese = 0;
143 root 1.27 }
144     }
145 root 1.1
146 root 1.27 pthread_mutex_unlock (&reslock);
147 root 1.3
148 root 1.27 if (!req)
149     break;
150 root 1.3
151 root 1.1 nreqs--;
152    
153     if (req->type == REQ_QUIT)
154     started--;
155     else
156     {
157     int errorno = errno;
158     errno = req->errorno;
159    
160     if (req->type == REQ_READ)
161 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
162 root 1.1
163 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
164     SvREADONLY_off (req->data);
165    
166 root 1.27 if (req->statdata)
167 root 1.1 {
168     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
169     PL_laststatval = req->result;
170     PL_statcache = *(req->statdata);
171     }
172    
173 root 1.13 ENTER;
174 root 1.1 PUSHMARK (SP);
175     XPUSHs (sv_2mortal (newSViv (req->result)));
176 root 1.2
177     if (req->type == REQ_OPEN)
178     {
179     /* convert fd to fh */
180     SV *fh;
181    
182     PUTBACK;
183     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
184     SPAGAIN;
185    
186 root 1.13 fh = SvREFCNT_inc (POPs);
187 root 1.2
188     PUSHMARK (SP);
189 root 1.13 XPUSHs (sv_2mortal (fh));
190 root 1.2 }
191    
192 root 1.8 if (SvOK (req->callback))
193     {
194     PUTBACK;
195     call_sv (req->callback, G_VOID | G_EVAL);
196     SPAGAIN;
197 root 1.27
198     if (SvTRUE (ERRSV))
199     {
200     free_req (req);
201     croak (0);
202     }
203 root 1.8 }
204 root 1.13
205 root 1.26 LEAVE;
206    
207 root 1.1 errno = errorno;
208     count++;
209     }
210    
211 root 1.27 free_req (req);
212 root 1.1 }
213    
214     return count;
215     }
216    
217 root 1.4 static void *aio_proc(void *arg);
218    
219     static void
220     start_thread (void)
221     {
222     sigset_t fullsigset, oldsigset;
223     pthread_t tid;
224     pthread_attr_t attr;
225    
226     pthread_attr_init (&attr);
227     pthread_attr_setstacksize (&attr, STACKSIZE);
228 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
229 root 1.4
230     sigfillset (&fullsigset);
231     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
232    
233     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
234     started++;
235    
236     sigprocmask (SIG_SETMASK, &oldsigset, 0);
237     }
238    
239     static void
240     send_req (aio_req req)
241     {
242 root 1.30 while (started < wanted && nreqs >= started)
243     start_thread ();
244    
245 root 1.4 nreqs++;
246    
247     pthread_mutex_lock (&reqlock);
248    
249     req->next = 0;
250    
251     if (reqe)
252     {
253     reqe->next = req;
254     reqe = req;
255     }
256     else
257     reqe = reqs = req;
258    
259     pthread_cond_signal (&reqwait);
260     pthread_mutex_unlock (&reqlock);
261    
262 root 1.27 if (nreqs > max_outstanding)
263     for (;;)
264     {
265     poll_cb ();
266    
267     if (nreqs <= max_outstanding)
268     break;
269    
270     poll_wait ();
271     }
272 root 1.4 }
273    
274     static void
275     end_thread (void)
276     {
277     aio_req req;
278 root 1.26 Newz (0, req, 1, aio_cb);
279 root 1.4 req->type = REQ_QUIT;
280    
281     send_req (req);
282     }
283    
284 root 1.22 static void min_parallel (int nthreads)
285     {
286 root 1.30 if (wanted < nthreads)
287     wanted = nthreads;
288 root 1.22 }
289    
290     static void max_parallel (int nthreads)
291     {
292     int cur = started;
293 root 1.27
294 root 1.30 if (wanted > nthreads)
295     wanted = nthreads;
296    
297     while (cur > wanted)
298     {
299 root 1.22 end_thread ();
300     cur--;
301     }
302    
303 root 1.30 while (started > wanted)
304 root 1.22 {
305     poll_wait ();
306     poll_cb ();
307     }
308     }
309    
310 root 1.26 static void create_pipe ()
311 root 1.22 {
312 root 1.26 if (pipe (respipe))
313     croak ("unable to initialize result pipe");
314 root 1.22
315 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
316     croak ("cannot set result pipe to nonblocking mode");
317 root 1.22
318 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
319     croak ("cannot set result pipe to nonblocking mode");
320     }
321 root 1.22
322 root 1.26 static void atfork_prepare (void)
323     {
324 root 1.28 pthread_mutex_lock (&reqlock);
325 root 1.22 pthread_mutex_lock (&reslock);
326     }
327    
328     static void atfork_parent (void)
329     {
330     pthread_mutex_unlock (&reslock);
331 root 1.26 pthread_mutex_unlock (&reqlock);
332     }
333    
334     static void atfork_child (void)
335     {
336 root 1.28 aio_req prv;
337    
338 root 1.26 started = 0;
339    
340 root 1.28 while (reqs)
341     {
342     prv = reqs;
343     reqs = prv->next;
344     free_req (prv);
345     }
346    
347     reqs = reqe = 0;
348    
349     while (ress)
350     {
351     prv = ress;
352     ress = prv->next;
353     free_req (prv);
354     }
355    
356     ress = rese = 0;
357    
358 root 1.29 close (respipe [0]);
359     close (respipe [1]);
360     create_pipe ();
361    
362 root 1.26 atfork_parent ();
363 root 1.22 }
364    
365 root 1.35 /* currently noops */
366     #define LOCK_FD(fd) do { } while (0)
367     #define UNLOCK_FD(fd) do { } while (0)
368    
369 root 1.22 /*****************************************************************************/
370 root 1.17 /* work around various missing functions */
371    
372     #if !HAVE_PREADWRITE
373     # define pread aio_pread
374     # define pwrite aio_pwrite
375    
376     /*
377     * make our pread/pwrite safe against themselves, but not against
378     * normal read/write by using a mutex. slows down execution a lot,
379     * but that's your problem, not mine.
380     */
381     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
382    
383     static ssize_t
384     pread (int fd, void *buf, size_t count, off_t offset)
385     {
386     ssize_t res;
387     off_t ooffset;
388    
389 root 1.35 LOCK_FD (fd);
390     pthread_mutex_lock (&iolock); /* replace by LOCK_FD and private buffer */
391 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
392     lseek (fd, offset, SEEK_SET);
393     res = read (fd, buf, count);
394     lseek (fd, ooffset, SEEK_SET);
395     pthread_mutex_unlock (&iolock);
396 root 1.35 UNLOCK_FD (d);
397 root 1.17
398     return res;
399     }
400    
401     static ssize_t
402     pwrite (int fd, void *buf, size_t count, off_t offset)
403     {
404     ssize_t res;
405     off_t ooffset;
406    
407 root 1.35 LOCK_FD (fd);
408     pthread_mutex_lock (&iolock); /* replace by LOCK_FD and private buffer */
409 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
410     lseek (fd, offset, SEEK_SET);
411     res = write (fd, buf, count);
412     lseek (fd, offset, SEEK_SET);
413     pthread_mutex_unlock (&iolock);
414 root 1.35 UNLOCK_FD (d);
415 root 1.17
416     return res;
417     }
418     #endif
419    
420     #if !HAVE_FDATASYNC
421     # define fdatasync fsync
422     #endif
423    
424     #if !HAVE_READAHEAD
425     # define readahead aio_readahead
426    
427     static char readahead_buf[4096];
428    
429     static ssize_t
430     readahead (int fd, off_t offset, size_t count)
431     {
432     while (count > 0)
433     {
434     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
435    
436     pread (fd, readahead_buf, len, offset);
437     offset += len;
438     count -= len;
439     }
440    
441     errno = 0;
442     }
443     #endif
444    
445 root 1.32 /* sendfile always needs emulation */
446     static ssize_t
447     sendfile_ (int ofd, int ifd, off_t offset, size_t count)
448     {
449     ssize_t res;
450    
451     if (!count)
452     return 0;
453    
454 root 1.35 LOCK_FD (ofd);
455    
456     #if HAVE_SENDFILE
457     # if __linux
458 root 1.32 res = sendfile (ofd, ifd, &offset, count);
459    
460 root 1.35 # elif __freebsd
461 root 1.32 /*
462     * Of course, the freebsd sendfile is a dire hack with no thoughts
463 root 1.35 * wasted on making it similar to other I/O functions.
464 root 1.32 */
465     {
466     off_t sbytes;
467     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
468    
469 root 1.35 if (res < 0 && sbytes)
470     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
471 root 1.32 res = sbytes;
472     }
473    
474 root 1.35 # elif __hpux
475 root 1.32 res = sendfile (ofd, ifd, offset, count, 0, 0);
476    
477 root 1.35 # elif __solaris
478     {
479     struct sendfilevec vec;
480     size_t sbytes;
481    
482     vec.sfv_fd = ifd;
483     vec.sfv_flag = 0;
484     vec.sfv_off = offset;
485     vec.sfv_len = count;
486    
487     res = sendfilev (ofd, &vec, 1, &sbytes);
488    
489     if (res < 0 && sbytes)
490     res = sbytes;
491     }
492    
493     # else
494 root 1.32 res = -1;
495     errno = ENOSYS;
496 root 1.35 # endif
497 root 1.32 #endif
498    
499 root 1.35 if (res < 0
500     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
501     #if __solaris
502     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
503     #endif
504     )
505     )
506 root 1.32 {
507     /* emulate sendfile. this is a major pain in the ass */
508     char *buf = malloc (4096);
509     res = 0;
510    
511     for (;;)
512     {
513     ssize_t cnt;
514    
515     cnt = pread (ifd, buf, 4096, offset);
516    
517     if (cnt <= 0)
518     {
519     if (cnt && !res) res = -1;
520     break;
521     }
522    
523     cnt = write (ofd, buf, cnt);
524    
525     if (cnt <= 0)
526     {
527     if (cnt && !res) res = -1;
528     break;
529     }
530    
531     offset += cnt;
532     res += cnt;
533     }
534    
535     {
536     int errorno = errno;
537     free (buf);
538     errno = errorno;
539     }
540     }
541    
542 root 1.35 UNLOCK_FD (ofd);
543    
544 root 1.32 return res;
545     }
546    
547 root 1.22 /*****************************************************************************/
548    
549 root 1.1 static void *
550     aio_proc (void *thr_arg)
551     {
552     aio_req req;
553 root 1.3 int type;
554 root 1.1
555 root 1.3 do
556 root 1.1 {
557 root 1.3 pthread_mutex_lock (&reqlock);
558    
559     for (;;)
560     {
561     req = reqs;
562    
563     if (reqs)
564     {
565     reqs = reqs->next;
566     if (!reqs) reqe = 0;
567     }
568    
569     if (req)
570     break;
571    
572     pthread_cond_wait (&reqwait, &reqlock);
573     }
574    
575     pthread_mutex_unlock (&reqlock);
576    
577 root 1.1 errno = 0; /* strictly unnecessary */
578    
579 root 1.3 type = req->type;
580    
581     switch (type)
582 root 1.1 {
583 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
584     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
585 root 1.16
586 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
587 root 1.32 case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
588 root 1.1
589     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
590     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
591     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
592    
593     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
594     case REQ_CLOSE: req->result = close (req->fd); break;
595     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
596 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
597     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
598 root 1.1
599 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
600 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
601    
602     case REQ_QUIT:
603 root 1.3 break;
604 root 1.1
605     default:
606     req->result = ENOSYS;
607     break;
608     }
609    
610     req->errorno = errno;
611 root 1.3
612     pthread_mutex_lock (&reslock);
613    
614     req->next = 0;
615    
616     if (rese)
617     {
618     rese->next = req;
619     rese = req;
620     }
621     else
622     {
623     rese = ress = req;
624    
625     /* write a dummy byte to the pipe so fh becomes ready */
626     write (respipe [1], &respipe, 1);
627     }
628    
629     pthread_mutex_unlock (&reslock);
630 root 1.1 }
631 root 1.3 while (type != REQ_QUIT);
632 root 1.1
633     return 0;
634     }
635    
636 root 1.22 #define dREQ \
637     aio_req req; \
638     \
639     if (SvOK (callback) && !SvROK (callback)) \
640     croak ("clalback must be undef or of reference type"); \
641     \
642     Newz (0, req, 1, aio_cb); \
643     if (!req) \
644     croak ("out of memory during aio_req allocation"); \
645     \
646 root 1.27 req->callback = newSVsv (callback);
647 root 1.22
648 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
649    
650 root 1.8 PROTOTYPES: ENABLE
651    
652 root 1.1 BOOT:
653     {
654 root 1.26 create_pipe ();
655 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
656 root 1.1 }
657    
658     void
659     min_parallel(nthreads)
660     int nthreads
661     PROTOTYPE: $
662    
663     void
664     max_parallel(nthreads)
665     int nthreads
666     PROTOTYPE: $
667    
668 root 1.4 int
669     max_outstanding(nreqs)
670     int nreqs
671     PROTOTYPE: $
672     CODE:
673     RETVAL = max_outstanding;
674     max_outstanding = nreqs;
675    
676 root 1.1 void
677 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
678 root 1.1 SV * pathname
679     int flags
680     int mode
681     SV * callback
682 root 1.8 PROTOTYPE: $$$;$
683 root 1.1 CODE:
684     {
685 root 1.22 dREQ;
686 root 1.1
687     req->type = REQ_OPEN;
688     req->data = newSVsv (pathname);
689 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
690 root 1.1 req->fd = flags;
691     req->mode = mode;
692    
693     send_req (req);
694     }
695    
696     void
697 root 1.8 aio_close(fh,callback=&PL_sv_undef)
698 root 1.13 SV * fh
699     SV * callback
700 root 1.8 PROTOTYPE: $;$
701 root 1.1 ALIAS:
702     aio_close = REQ_CLOSE
703     aio_fsync = REQ_FSYNC
704     aio_fdatasync = REQ_FDATASYNC
705     CODE:
706     {
707 root 1.22 dREQ;
708 root 1.1
709     req->type = ix;
710 root 1.13 req->fh = newSVsv (fh);
711     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
712 root 1.1
713     send_req (req);
714     }
715    
716     void
717 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
718 root 1.13 SV * fh
719     UV offset
720 root 1.32 UV length
721 root 1.13 SV * data
722 root 1.32 UV dataoffset
723 root 1.13 SV * callback
724     ALIAS:
725     aio_read = REQ_READ
726     aio_write = REQ_WRITE
727 root 1.8 PROTOTYPE: $$$$$;$
728 root 1.1 CODE:
729 root 1.13 {
730     aio_req req;
731     STRLEN svlen;
732 root 1.21 char *svptr = SvPVbyte (data, svlen);
733 root 1.13
734     SvUPGRADE (data, SVt_PV);
735     SvPOK_on (data);
736 root 1.1
737 root 1.13 if (dataoffset < 0)
738     dataoffset += svlen;
739    
740     if (dataoffset < 0 || dataoffset > svlen)
741     croak ("data offset outside of string");
742    
743     if (ix == REQ_WRITE)
744     {
745     /* write: check length and adjust. */
746     if (length < 0 || length + dataoffset > svlen)
747     length = svlen - dataoffset;
748     }
749     else
750     {
751     /* read: grow scalar as necessary */
752     svptr = SvGROW (data, length + dataoffset);
753     }
754    
755     if (length < 0)
756     croak ("length must not be negative");
757    
758 root 1.22 {
759     dREQ;
760 root 1.13
761 root 1.22 req->type = ix;
762     req->fh = newSVsv (fh);
763     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
764     : IoOFP (sv_2io (fh)));
765     req->offset = offset;
766     req->length = length;
767     req->data = SvREFCNT_inc (data);
768     req->dataptr = (char *)svptr + dataoffset;
769 root 1.13
770 root 1.28 if (!SvREADONLY (data))
771     {
772     SvREADONLY_on (data);
773     req->data2ptr = (void *)data;
774     }
775    
776 root 1.22 send_req (req);
777     }
778 root 1.13 }
779 root 1.1
780     void
781 root 1.32 aio_sendfile(out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
782     SV * out_fh
783     SV * in_fh
784     UV in_offset
785     UV length
786     SV * callback
787     PROTOTYPE: $$$$;$
788     CODE:
789     {
790     dREQ;
791    
792     req->type = REQ_SENDFILE;
793     req->fh = newSVsv (out_fh);
794     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
795     req->fh2 = newSVsv (in_fh);
796     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
797     req->offset = in_offset;
798     req->length = length;
799    
800     send_req (req);
801     }
802    
803     void
804 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
805 root 1.13 SV * fh
806     UV offset
807     IV length
808     SV * callback
809 root 1.8 PROTOTYPE: $$$;$
810 root 1.1 CODE:
811     {
812 root 1.22 dREQ;
813 root 1.1
814     req->type = REQ_READAHEAD;
815 root 1.13 req->fh = newSVsv (fh);
816     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
817 root 1.1 req->offset = offset;
818     req->length = length;
819    
820     send_req (req);
821     }
822    
823     void
824 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
825 root 1.1 SV * fh_or_path
826     SV * callback
827     ALIAS:
828 root 1.8 aio_stat = REQ_STAT
829     aio_lstat = REQ_LSTAT
830 root 1.1 CODE:
831     {
832 root 1.22 dREQ;
833 root 1.1
834     New (0, req->statdata, 1, Stat_t);
835     if (!req->statdata)
836 root 1.27 {
837     free_req (req);
838     croak ("out of memory during aio_req->statdata allocation");
839     }
840 root 1.1
841     if (SvPOK (fh_or_path))
842     {
843 root 1.8 req->type = ix;
844 root 1.1 req->data = newSVsv (fh_or_path);
845 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
846 root 1.1 }
847     else
848     {
849     req->type = REQ_FSTAT;
850 root 1.13 req->fh = newSVsv (fh_or_path);
851 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
852     }
853    
854     send_req (req);
855     }
856    
857     void
858 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
859 root 1.1 SV * pathname
860     SV * callback
861 root 1.22 ALIAS:
862     aio_unlink = REQ_UNLINK
863     aio_rmdir = REQ_RMDIR
864 root 1.1 CODE:
865     {
866 root 1.22 dREQ;
867 root 1.1
868 root 1.22 req->type = ix;
869     req->data = newSVsv (pathname);
870     req->dataptr = SvPVbyte_nolen (req->data);
871 root 1.1
872 root 1.22 send_req (req);
873     }
874    
875     void
876     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
877     SV * oldpath
878     SV * newpath
879     SV * callback
880     CODE:
881     {
882     dREQ;
883 root 1.1
884 root 1.22 req->type = REQ_SYMLINK;
885     req->fh = newSVsv (oldpath);
886     req->data2ptr = SvPVbyte_nolen (req->fh);
887     req->data = newSVsv (newpath);
888     req->dataptr = SvPVbyte_nolen (req->data);
889 root 1.1
890     send_req (req);
891     }
892    
893 root 1.6 void
894     flush()
895     PROTOTYPE:
896     CODE:
897     while (nreqs)
898     {
899     poll_wait ();
900     poll_cb ();
901     }
902    
903 root 1.7 void
904     poll()
905     PROTOTYPE:
906     CODE:
907     if (nreqs)
908     {
909     poll_wait ();
910     poll_cb ();
911     }
912    
913 root 1.1 int
914     poll_fileno()
915     PROTOTYPE:
916     CODE:
917 root 1.3 RETVAL = respipe [0];
918 root 1.1 OUTPUT:
919     RETVAL
920    
921     int
922     poll_cb(...)
923     PROTOTYPE:
924     CODE:
925 root 1.5 RETVAL = poll_cb ();
926 root 1.1 OUTPUT:
927     RETVAL
928    
929     void
930     poll_wait()
931     PROTOTYPE:
932     CODE:
933 root 1.3 if (nreqs)
934     poll_wait ();
935 root 1.1
936     int
937     nreqs()
938     PROTOTYPE:
939     CODE:
940     RETVAL = nreqs;
941     OUTPUT:
942     RETVAL
943