ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.36
Committed: Tue Aug 23 01:18:04 2005 UTC (18 years, 8 months ago) by root
Branch: MAIN
Changes since 1.35: +2 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.1 #include <sys/types.h>
13     #include <sys/stat.h>
14 root 1.10
15 root 1.1 #include <unistd.h>
16     #include <fcntl.h>
17     #include <signal.h>
18     #include <sched.h>
19    
20 root 1.32 #if HAVE_SENDFILE
21     # if __linux
22     # include <sys/sendfile.h>
23     # elif __freebsd
24     # include <sys/socket.h>
25     # include <sys/uio.h>
26     # elif __hpux
27     # include <sys/socket.h>
28 root 1.35 # elif __solaris /* not yet */
29     # include <sys/sendfile.h>
30 root 1.34 # else
31     # error sendfile support requested but not available
32 root 1.32 # endif
33     #endif
34 root 1.1
35 root 1.4 #if __ia64
36     # define STACKSIZE 65536
37 root 1.1 #else
38 root 1.4 # define STACKSIZE 4096
39 root 1.1 #endif
40    
41     enum {
42     REQ_QUIT,
43     REQ_OPEN, REQ_CLOSE,
44     REQ_READ, REQ_WRITE, REQ_READAHEAD,
45 root 1.32 REQ_SENDFILE,
46 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
47 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
48 root 1.23 REQ_UNLINK, REQ_RMDIR,
49 root 1.22 REQ_SYMLINK,
50 root 1.1 };
51    
52     typedef struct aio_cb {
53 root 1.3 struct aio_cb *volatile next;
54 root 1.1
55     int type;
56    
57 root 1.32 int fd, fd2;
58 root 1.1 off_t offset;
59     size_t length;
60     ssize_t result;
61     mode_t mode; /* open */
62     int errorno;
63 root 1.32 SV *data, *callback;
64     SV *fh, *fh2;
65 root 1.22 void *dataptr, *data2ptr;
66 root 1.1 STRLEN dataoffset;
67    
68     Stat_t *statdata;
69     } aio_cb;
70    
71     typedef aio_cb *aio_req;
72    
73 root 1.30 static int started, wanted;
74 root 1.11 static volatile int nreqs;
75 root 1.4 static int max_outstanding = 1<<30;
76 root 1.3 static int respipe [2];
77 root 1.1
78 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
79     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
80     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
81    
82     static volatile aio_req reqs, reqe; /* queue start, queue end */
83     static volatile aio_req ress, rese; /* queue start, queue end */
84 root 1.1
85 root 1.26 static void free_req (aio_req req)
86     {
87     if (req->data)
88     SvREFCNT_dec (req->data);
89    
90     if (req->fh)
91     SvREFCNT_dec (req->fh);
92    
93 root 1.32 if (req->fh2)
94     SvREFCNT_dec (req->fh2);
95    
96 root 1.27 if (req->statdata)
97 root 1.26 Safefree (req->statdata);
98    
99     if (req->callback)
100     SvREFCNT_dec (req->callback);
101    
102     Safefree (req);
103     }
104    
105 root 1.1 static void
106     poll_wait ()
107     {
108 root 1.11 if (nreqs && !ress)
109     {
110     fd_set rfd;
111     FD_ZERO(&rfd);
112     FD_SET(respipe [0], &rfd);
113 root 1.3
114 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
115     }
116 root 1.1 }
117    
118     static int
119 root 1.5 poll_cb ()
120 root 1.1 {
121     dSP;
122     int count = 0;
123 root 1.24 int do_croak = 0;
124 root 1.27 aio_req req;
125    
126     for (;;)
127     {
128     pthread_mutex_lock (&reslock);
129     req = ress;
130    
131     if (req)
132     {
133     ress = req->next;
134 root 1.11
135 root 1.27 if (!ress)
136     {
137     /* read any signals sent by the worker threads */
138     char buf [32];
139     while (read (respipe [0], buf, 32) == 32)
140     ;
141 root 1.30
142     rese = 0;
143 root 1.27 }
144     }
145 root 1.1
146 root 1.27 pthread_mutex_unlock (&reslock);
147 root 1.3
148 root 1.27 if (!req)
149     break;
150 root 1.3
151 root 1.1 nreqs--;
152    
153     if (req->type == REQ_QUIT)
154     started--;
155     else
156     {
157     int errorno = errno;
158     errno = req->errorno;
159    
160     if (req->type == REQ_READ)
161 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
162 root 1.1
163 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
164     SvREADONLY_off (req->data);
165    
166 root 1.27 if (req->statdata)
167 root 1.1 {
168     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
169     PL_laststatval = req->result;
170     PL_statcache = *(req->statdata);
171     }
172    
173 root 1.13 ENTER;
174 root 1.1 PUSHMARK (SP);
175     XPUSHs (sv_2mortal (newSViv (req->result)));
176 root 1.2
177     if (req->type == REQ_OPEN)
178     {
179     /* convert fd to fh */
180     SV *fh;
181    
182     PUTBACK;
183     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
184     SPAGAIN;
185    
186 root 1.13 fh = SvREFCNT_inc (POPs);
187 root 1.2
188     PUSHMARK (SP);
189 root 1.13 XPUSHs (sv_2mortal (fh));
190 root 1.2 }
191    
192 root 1.8 if (SvOK (req->callback))
193     {
194     PUTBACK;
195     call_sv (req->callback, G_VOID | G_EVAL);
196     SPAGAIN;
197 root 1.27
198     if (SvTRUE (ERRSV))
199     {
200     free_req (req);
201     croak (0);
202     }
203 root 1.8 }
204 root 1.13
205 root 1.26 LEAVE;
206    
207 root 1.1 errno = errorno;
208     count++;
209     }
210    
211 root 1.27 free_req (req);
212 root 1.1 }
213    
214     return count;
215     }
216    
217 root 1.4 static void *aio_proc(void *arg);
218    
219     static void
220     start_thread (void)
221     {
222     sigset_t fullsigset, oldsigset;
223     pthread_t tid;
224     pthread_attr_t attr;
225    
226     pthread_attr_init (&attr);
227     pthread_attr_setstacksize (&attr, STACKSIZE);
228 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
229 root 1.4
230     sigfillset (&fullsigset);
231     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
232    
233     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
234     started++;
235    
236     sigprocmask (SIG_SETMASK, &oldsigset, 0);
237     }
238    
239     static void
240     send_req (aio_req req)
241     {
242 root 1.30 while (started < wanted && nreqs >= started)
243     start_thread ();
244    
245 root 1.4 nreqs++;
246    
247     pthread_mutex_lock (&reqlock);
248    
249     req->next = 0;
250    
251     if (reqe)
252     {
253     reqe->next = req;
254     reqe = req;
255     }
256     else
257     reqe = reqs = req;
258    
259     pthread_cond_signal (&reqwait);
260     pthread_mutex_unlock (&reqlock);
261    
262 root 1.27 if (nreqs > max_outstanding)
263     for (;;)
264     {
265     poll_cb ();
266    
267     if (nreqs <= max_outstanding)
268     break;
269    
270     poll_wait ();
271     }
272 root 1.4 }
273    
274     static void
275     end_thread (void)
276     {
277     aio_req req;
278 root 1.26 Newz (0, req, 1, aio_cb);
279 root 1.4 req->type = REQ_QUIT;
280    
281     send_req (req);
282     }
283    
284 root 1.22 static void min_parallel (int nthreads)
285     {
286 root 1.30 if (wanted < nthreads)
287     wanted = nthreads;
288 root 1.22 }
289    
290     static void max_parallel (int nthreads)
291     {
292     int cur = started;
293 root 1.27
294 root 1.30 if (wanted > nthreads)
295     wanted = nthreads;
296    
297     while (cur > wanted)
298     {
299 root 1.22 end_thread ();
300     cur--;
301     }
302    
303 root 1.30 while (started > wanted)
304 root 1.22 {
305     poll_wait ();
306     poll_cb ();
307     }
308     }
309    
310 root 1.26 static void create_pipe ()
311 root 1.22 {
312 root 1.26 if (pipe (respipe))
313     croak ("unable to initialize result pipe");
314 root 1.22
315 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
316     croak ("cannot set result pipe to nonblocking mode");
317 root 1.22
318 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
319     croak ("cannot set result pipe to nonblocking mode");
320     }
321 root 1.22
322 root 1.26 static void atfork_prepare (void)
323     {
324 root 1.28 pthread_mutex_lock (&reqlock);
325 root 1.22 pthread_mutex_lock (&reslock);
326     }
327    
328     static void atfork_parent (void)
329     {
330     pthread_mutex_unlock (&reslock);
331 root 1.26 pthread_mutex_unlock (&reqlock);
332     }
333    
334     static void atfork_child (void)
335     {
336 root 1.28 aio_req prv;
337    
338 root 1.26 started = 0;
339    
340 root 1.28 while (reqs)
341     {
342     prv = reqs;
343     reqs = prv->next;
344     free_req (prv);
345     }
346    
347     reqs = reqe = 0;
348    
349     while (ress)
350     {
351     prv = ress;
352     ress = prv->next;
353     free_req (prv);
354     }
355    
356     ress = rese = 0;
357    
358 root 1.29 close (respipe [0]);
359     close (respipe [1]);
360     create_pipe ();
361    
362 root 1.26 atfork_parent ();
363 root 1.22 }
364    
365     /*****************************************************************************/
366 root 1.17 /* work around various missing functions */
367    
368     #if !HAVE_PREADWRITE
369     # define pread aio_pread
370     # define pwrite aio_pwrite
371    
372     /*
373     * make our pread/pwrite safe against themselves, but not against
374     * normal read/write by using a mutex. slows down execution a lot,
375     * but that's your problem, not mine.
376     */
377     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
378    
379     static ssize_t
380     pread (int fd, void *buf, size_t count, off_t offset)
381     {
382     ssize_t res;
383     off_t ooffset;
384    
385 root 1.36 pthread_mutex_lock (&iolock);
386 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
387     lseek (fd, offset, SEEK_SET);
388     res = read (fd, buf, count);
389     lseek (fd, ooffset, SEEK_SET);
390     pthread_mutex_unlock (&iolock);
391    
392     return res;
393     }
394    
395     static ssize_t
396     pwrite (int fd, void *buf, size_t count, off_t offset)
397     {
398     ssize_t res;
399     off_t ooffset;
400    
401 root 1.36 pthread_mutex_lock (&iolock);
402 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
403     lseek (fd, offset, SEEK_SET);
404     res = write (fd, buf, count);
405     lseek (fd, offset, SEEK_SET);
406     pthread_mutex_unlock (&iolock);
407    
408     return res;
409     }
410     #endif
411    
412     #if !HAVE_FDATASYNC
413     # define fdatasync fsync
414     #endif
415    
416     #if !HAVE_READAHEAD
417     # define readahead aio_readahead
418    
419     static char readahead_buf[4096];
420    
421     static ssize_t
422     readahead (int fd, off_t offset, size_t count)
423     {
424     while (count > 0)
425     {
426     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
427    
428     pread (fd, readahead_buf, len, offset);
429     offset += len;
430     count -= len;
431     }
432    
433     errno = 0;
434     }
435     #endif
436    
437 root 1.32 /* sendfile always needs emulation */
438     static ssize_t
439     sendfile_ (int ofd, int ifd, off_t offset, size_t count)
440     {
441     ssize_t res;
442    
443     if (!count)
444     return 0;
445    
446 root 1.35 #if HAVE_SENDFILE
447     # if __linux
448 root 1.32 res = sendfile (ofd, ifd, &offset, count);
449    
450 root 1.35 # elif __freebsd
451 root 1.32 /*
452     * Of course, the freebsd sendfile is a dire hack with no thoughts
453 root 1.35 * wasted on making it similar to other I/O functions.
454 root 1.32 */
455     {
456     off_t sbytes;
457     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
458    
459 root 1.35 if (res < 0 && sbytes)
460     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
461 root 1.32 res = sbytes;
462     }
463    
464 root 1.35 # elif __hpux
465 root 1.32 res = sendfile (ofd, ifd, offset, count, 0, 0);
466    
467 root 1.35 # elif __solaris
468     {
469     struct sendfilevec vec;
470     size_t sbytes;
471    
472     vec.sfv_fd = ifd;
473     vec.sfv_flag = 0;
474     vec.sfv_off = offset;
475     vec.sfv_len = count;
476    
477     res = sendfilev (ofd, &vec, 1, &sbytes);
478    
479     if (res < 0 && sbytes)
480     res = sbytes;
481     }
482    
483     # else
484 root 1.32 res = -1;
485     errno = ENOSYS;
486 root 1.35 # endif
487 root 1.32 #endif
488    
489 root 1.35 if (res < 0
490     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
491     #if __solaris
492     || errno == EAFNOSUPPORT || errno == EPROTOTYPE
493     #endif
494     )
495     )
496 root 1.32 {
497     /* emulate sendfile. this is a major pain in the ass */
498     char *buf = malloc (4096);
499     res = 0;
500    
501     for (;;)
502     {
503     ssize_t cnt;
504    
505     cnt = pread (ifd, buf, 4096, offset);
506    
507     if (cnt <= 0)
508     {
509     if (cnt && !res) res = -1;
510     break;
511     }
512    
513     cnt = write (ofd, buf, cnt);
514    
515     if (cnt <= 0)
516     {
517     if (cnt && !res) res = -1;
518     break;
519     }
520    
521     offset += cnt;
522     res += cnt;
523     }
524    
525     {
526     int errorno = errno;
527     free (buf);
528     errno = errorno;
529     }
530     }
531    
532     return res;
533     }
534    
535 root 1.22 /*****************************************************************************/
536    
537 root 1.1 static void *
538     aio_proc (void *thr_arg)
539     {
540     aio_req req;
541 root 1.3 int type;
542 root 1.1
543 root 1.3 do
544 root 1.1 {
545 root 1.3 pthread_mutex_lock (&reqlock);
546    
547     for (;;)
548     {
549     req = reqs;
550    
551     if (reqs)
552     {
553     reqs = reqs->next;
554     if (!reqs) reqe = 0;
555     }
556    
557     if (req)
558     break;
559    
560     pthread_cond_wait (&reqwait, &reqlock);
561     }
562    
563     pthread_mutex_unlock (&reqlock);
564    
565 root 1.1 errno = 0; /* strictly unnecessary */
566    
567 root 1.3 type = req->type;
568    
569     switch (type)
570 root 1.1 {
571 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
572     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
573 root 1.16
574 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
575 root 1.32 case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
576 root 1.1
577     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
578     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
579     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
580    
581     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
582     case REQ_CLOSE: req->result = close (req->fd); break;
583     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
584 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
585     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
586 root 1.1
587 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
588 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
589    
590     case REQ_QUIT:
591 root 1.3 break;
592 root 1.1
593     default:
594     req->result = ENOSYS;
595     break;
596     }
597    
598     req->errorno = errno;
599 root 1.3
600     pthread_mutex_lock (&reslock);
601    
602     req->next = 0;
603    
604     if (rese)
605     {
606     rese->next = req;
607     rese = req;
608     }
609     else
610     {
611     rese = ress = req;
612    
613     /* write a dummy byte to the pipe so fh becomes ready */
614     write (respipe [1], &respipe, 1);
615     }
616    
617     pthread_mutex_unlock (&reslock);
618 root 1.1 }
619 root 1.3 while (type != REQ_QUIT);
620 root 1.1
621     return 0;
622     }
623    
624 root 1.22 #define dREQ \
625     aio_req req; \
626     \
627     if (SvOK (callback) && !SvROK (callback)) \
628     croak ("clalback must be undef or of reference type"); \
629     \
630     Newz (0, req, 1, aio_cb); \
631     if (!req) \
632     croak ("out of memory during aio_req allocation"); \
633     \
634 root 1.27 req->callback = newSVsv (callback);
635 root 1.22
636 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
637    
638 root 1.8 PROTOTYPES: ENABLE
639    
640 root 1.1 BOOT:
641     {
642 root 1.26 create_pipe ();
643 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
644 root 1.1 }
645    
646     void
647     min_parallel(nthreads)
648     int nthreads
649     PROTOTYPE: $
650    
651     void
652     max_parallel(nthreads)
653     int nthreads
654     PROTOTYPE: $
655    
656 root 1.4 int
657     max_outstanding(nreqs)
658     int nreqs
659     PROTOTYPE: $
660     CODE:
661     RETVAL = max_outstanding;
662     max_outstanding = nreqs;
663    
664 root 1.1 void
665 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
666 root 1.1 SV * pathname
667     int flags
668     int mode
669     SV * callback
670 root 1.8 PROTOTYPE: $$$;$
671 root 1.1 CODE:
672     {
673 root 1.22 dREQ;
674 root 1.1
675     req->type = REQ_OPEN;
676     req->data = newSVsv (pathname);
677 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
678 root 1.1 req->fd = flags;
679     req->mode = mode;
680    
681     send_req (req);
682     }
683    
684     void
685 root 1.8 aio_close(fh,callback=&PL_sv_undef)
686 root 1.13 SV * fh
687     SV * callback
688 root 1.8 PROTOTYPE: $;$
689 root 1.1 ALIAS:
690     aio_close = REQ_CLOSE
691     aio_fsync = REQ_FSYNC
692     aio_fdatasync = REQ_FDATASYNC
693     CODE:
694     {
695 root 1.22 dREQ;
696 root 1.1
697     req->type = ix;
698 root 1.13 req->fh = newSVsv (fh);
699     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
700 root 1.1
701     send_req (req);
702     }
703    
704     void
705 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
706 root 1.13 SV * fh
707     UV offset
708 root 1.32 UV length
709 root 1.13 SV * data
710 root 1.32 UV dataoffset
711 root 1.13 SV * callback
712     ALIAS:
713     aio_read = REQ_READ
714     aio_write = REQ_WRITE
715 root 1.8 PROTOTYPE: $$$$$;$
716 root 1.1 CODE:
717 root 1.13 {
718     aio_req req;
719     STRLEN svlen;
720 root 1.21 char *svptr = SvPVbyte (data, svlen);
721 root 1.13
722     SvUPGRADE (data, SVt_PV);
723     SvPOK_on (data);
724 root 1.1
725 root 1.13 if (dataoffset < 0)
726     dataoffset += svlen;
727    
728     if (dataoffset < 0 || dataoffset > svlen)
729     croak ("data offset outside of string");
730    
731     if (ix == REQ_WRITE)
732     {
733     /* write: check length and adjust. */
734     if (length < 0 || length + dataoffset > svlen)
735     length = svlen - dataoffset;
736     }
737     else
738     {
739     /* read: grow scalar as necessary */
740     svptr = SvGROW (data, length + dataoffset);
741     }
742    
743     if (length < 0)
744     croak ("length must not be negative");
745    
746 root 1.22 {
747     dREQ;
748 root 1.13
749 root 1.22 req->type = ix;
750     req->fh = newSVsv (fh);
751     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
752     : IoOFP (sv_2io (fh)));
753     req->offset = offset;
754     req->length = length;
755     req->data = SvREFCNT_inc (data);
756     req->dataptr = (char *)svptr + dataoffset;
757 root 1.13
758 root 1.28 if (!SvREADONLY (data))
759     {
760     SvREADONLY_on (data);
761     req->data2ptr = (void *)data;
762     }
763    
764 root 1.22 send_req (req);
765     }
766 root 1.13 }
767 root 1.1
768     void
769 root 1.32 aio_sendfile(out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
770     SV * out_fh
771     SV * in_fh
772     UV in_offset
773     UV length
774     SV * callback
775     PROTOTYPE: $$$$;$
776     CODE:
777     {
778     dREQ;
779    
780     req->type = REQ_SENDFILE;
781     req->fh = newSVsv (out_fh);
782     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
783     req->fh2 = newSVsv (in_fh);
784     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
785     req->offset = in_offset;
786     req->length = length;
787    
788     send_req (req);
789     }
790    
791     void
792 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
793 root 1.13 SV * fh
794     UV offset
795     IV length
796     SV * callback
797 root 1.8 PROTOTYPE: $$$;$
798 root 1.1 CODE:
799     {
800 root 1.22 dREQ;
801 root 1.1
802     req->type = REQ_READAHEAD;
803 root 1.13 req->fh = newSVsv (fh);
804     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
805 root 1.1 req->offset = offset;
806     req->length = length;
807    
808     send_req (req);
809     }
810    
811     void
812 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
813 root 1.1 SV * fh_or_path
814     SV * callback
815     ALIAS:
816 root 1.8 aio_stat = REQ_STAT
817     aio_lstat = REQ_LSTAT
818 root 1.1 CODE:
819     {
820 root 1.22 dREQ;
821 root 1.1
822     New (0, req->statdata, 1, Stat_t);
823     if (!req->statdata)
824 root 1.27 {
825     free_req (req);
826     croak ("out of memory during aio_req->statdata allocation");
827     }
828 root 1.1
829     if (SvPOK (fh_or_path))
830     {
831 root 1.8 req->type = ix;
832 root 1.1 req->data = newSVsv (fh_or_path);
833 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
834 root 1.1 }
835     else
836     {
837     req->type = REQ_FSTAT;
838 root 1.13 req->fh = newSVsv (fh_or_path);
839 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
840     }
841    
842     send_req (req);
843     }
844    
845     void
846 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
847 root 1.1 SV * pathname
848     SV * callback
849 root 1.22 ALIAS:
850     aio_unlink = REQ_UNLINK
851     aio_rmdir = REQ_RMDIR
852 root 1.1 CODE:
853     {
854 root 1.22 dREQ;
855 root 1.1
856 root 1.22 req->type = ix;
857     req->data = newSVsv (pathname);
858     req->dataptr = SvPVbyte_nolen (req->data);
859 root 1.1
860 root 1.22 send_req (req);
861     }
862    
863     void
864     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
865     SV * oldpath
866     SV * newpath
867     SV * callback
868     CODE:
869     {
870     dREQ;
871 root 1.1
872 root 1.22 req->type = REQ_SYMLINK;
873     req->fh = newSVsv (oldpath);
874     req->data2ptr = SvPVbyte_nolen (req->fh);
875     req->data = newSVsv (newpath);
876     req->dataptr = SvPVbyte_nolen (req->data);
877 root 1.1
878     send_req (req);
879     }
880    
881 root 1.6 void
882     flush()
883     PROTOTYPE:
884     CODE:
885     while (nreqs)
886     {
887     poll_wait ();
888     poll_cb ();
889     }
890    
891 root 1.7 void
892     poll()
893     PROTOTYPE:
894     CODE:
895     if (nreqs)
896     {
897     poll_wait ();
898     poll_cb ();
899     }
900    
901 root 1.1 int
902     poll_fileno()
903     PROTOTYPE:
904     CODE:
905 root 1.3 RETVAL = respipe [0];
906 root 1.1 OUTPUT:
907     RETVAL
908    
909     int
910     poll_cb(...)
911     PROTOTYPE:
912     CODE:
913 root 1.5 RETVAL = poll_cb ();
914 root 1.1 OUTPUT:
915     RETVAL
916    
917     void
918     poll_wait()
919     PROTOTYPE:
920     CODE:
921 root 1.3 if (nreqs)
922     poll_wait ();
923 root 1.1
924     int
925     nreqs()
926     PROTOTYPE:
927     CODE:
928     RETVAL = nreqs;
929     OUTPUT:
930     RETVAL
931