ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.34
Committed: Tue Aug 23 00:03:14 2005 UTC (18 years, 8 months ago) by root
Branch: MAIN
Changes since 1.33: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.1 #include <sys/types.h>
13     #include <sys/stat.h>
14 root 1.10
15 root 1.1 #include <unistd.h>
16     #include <fcntl.h>
17     #include <signal.h>
18     #include <sched.h>
19    
20 root 1.32 #if HAVE_SENDFILE
21     # if __linux
22     # include <sys/sendfile.h>
23     # elif __freebsd
24     # include <sys/socket.h>
25     # include <sys/uio.h>
26     # elif __hpux
27     # include <sys/socket.h>
28 root 1.34 # else
29     # error sendfile support requested but not available
30 root 1.32 # endif
31     #endif
32 root 1.1
33 root 1.4 #if __ia64
34     # define STACKSIZE 65536
35 root 1.1 #else
36 root 1.4 # define STACKSIZE 4096
37 root 1.1 #endif
38    
39     enum {
40     REQ_QUIT,
41     REQ_OPEN, REQ_CLOSE,
42     REQ_READ, REQ_WRITE, REQ_READAHEAD,
43 root 1.32 REQ_SENDFILE,
44 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
45 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
46 root 1.23 REQ_UNLINK, REQ_RMDIR,
47 root 1.22 REQ_SYMLINK,
48 root 1.1 };
49    
50     typedef struct aio_cb {
51 root 1.3 struct aio_cb *volatile next;
52 root 1.1
53     int type;
54    
55 root 1.32 int fd, fd2;
56 root 1.1 off_t offset;
57     size_t length;
58     ssize_t result;
59     mode_t mode; /* open */
60     int errorno;
61 root 1.32 SV *data, *callback;
62     SV *fh, *fh2;
63 root 1.22 void *dataptr, *data2ptr;
64 root 1.1 STRLEN dataoffset;
65    
66     Stat_t *statdata;
67     } aio_cb;
68    
69     typedef aio_cb *aio_req;
70    
71 root 1.30 static int started, wanted;
72 root 1.11 static volatile int nreqs;
73 root 1.4 static int max_outstanding = 1<<30;
74 root 1.3 static int respipe [2];
75 root 1.1
76 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
77     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
78     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
79    
80     static volatile aio_req reqs, reqe; /* queue start, queue end */
81     static volatile aio_req ress, rese; /* queue start, queue end */
82 root 1.1
83 root 1.26 static void free_req (aio_req req)
84     {
85     if (req->data)
86     SvREFCNT_dec (req->data);
87    
88     if (req->fh)
89     SvREFCNT_dec (req->fh);
90    
91 root 1.32 if (req->fh2)
92     SvREFCNT_dec (req->fh2);
93    
94 root 1.27 if (req->statdata)
95 root 1.26 Safefree (req->statdata);
96    
97     if (req->callback)
98     SvREFCNT_dec (req->callback);
99    
100     Safefree (req);
101     }
102    
103 root 1.1 static void
104     poll_wait ()
105     {
106 root 1.11 if (nreqs && !ress)
107     {
108     fd_set rfd;
109     FD_ZERO(&rfd);
110     FD_SET(respipe [0], &rfd);
111 root 1.3
112 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
113     }
114 root 1.1 }
115    
116     static int
117 root 1.5 poll_cb ()
118 root 1.1 {
119     dSP;
120     int count = 0;
121 root 1.24 int do_croak = 0;
122 root 1.27 aio_req req;
123    
124     for (;;)
125     {
126     pthread_mutex_lock (&reslock);
127     req = ress;
128    
129     if (req)
130     {
131     ress = req->next;
132 root 1.11
133 root 1.27 if (!ress)
134     {
135     /* read any signals sent by the worker threads */
136     char buf [32];
137     while (read (respipe [0], buf, 32) == 32)
138     ;
139 root 1.30
140     rese = 0;
141 root 1.27 }
142     }
143 root 1.1
144 root 1.27 pthread_mutex_unlock (&reslock);
145 root 1.3
146 root 1.27 if (!req)
147     break;
148 root 1.3
149 root 1.1 nreqs--;
150    
151     if (req->type == REQ_QUIT)
152     started--;
153     else
154     {
155     int errorno = errno;
156     errno = req->errorno;
157    
158     if (req->type == REQ_READ)
159 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
160 root 1.1
161 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
162     SvREADONLY_off (req->data);
163    
164 root 1.27 if (req->statdata)
165 root 1.1 {
166     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
167     PL_laststatval = req->result;
168     PL_statcache = *(req->statdata);
169     }
170    
171 root 1.13 ENTER;
172 root 1.1 PUSHMARK (SP);
173     XPUSHs (sv_2mortal (newSViv (req->result)));
174 root 1.2
175     if (req->type == REQ_OPEN)
176     {
177     /* convert fd to fh */
178     SV *fh;
179    
180     PUTBACK;
181     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
182     SPAGAIN;
183    
184 root 1.13 fh = SvREFCNT_inc (POPs);
185 root 1.2
186     PUSHMARK (SP);
187 root 1.13 XPUSHs (sv_2mortal (fh));
188 root 1.2 }
189    
190 root 1.8 if (SvOK (req->callback))
191     {
192     PUTBACK;
193     call_sv (req->callback, G_VOID | G_EVAL);
194     SPAGAIN;
195 root 1.27
196     if (SvTRUE (ERRSV))
197     {
198     free_req (req);
199     croak (0);
200     }
201 root 1.8 }
202 root 1.13
203 root 1.26 LEAVE;
204    
205 root 1.1 errno = errorno;
206     count++;
207     }
208    
209 root 1.27 free_req (req);
210 root 1.1 }
211    
212     return count;
213     }
214    
215 root 1.4 static void *aio_proc(void *arg);
216    
217     static void
218     start_thread (void)
219     {
220     sigset_t fullsigset, oldsigset;
221     pthread_t tid;
222     pthread_attr_t attr;
223    
224     pthread_attr_init (&attr);
225     pthread_attr_setstacksize (&attr, STACKSIZE);
226 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
227 root 1.4
228     sigfillset (&fullsigset);
229     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
230    
231     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
232     started++;
233    
234     sigprocmask (SIG_SETMASK, &oldsigset, 0);
235     }
236    
237     static void
238     send_req (aio_req req)
239     {
240 root 1.30 while (started < wanted && nreqs >= started)
241     start_thread ();
242    
243 root 1.4 nreqs++;
244    
245     pthread_mutex_lock (&reqlock);
246    
247     req->next = 0;
248    
249     if (reqe)
250     {
251     reqe->next = req;
252     reqe = req;
253     }
254     else
255     reqe = reqs = req;
256    
257     pthread_cond_signal (&reqwait);
258     pthread_mutex_unlock (&reqlock);
259    
260 root 1.27 if (nreqs > max_outstanding)
261     for (;;)
262     {
263     poll_cb ();
264    
265     if (nreqs <= max_outstanding)
266     break;
267    
268     poll_wait ();
269     }
270 root 1.4 }
271    
272     static void
273     end_thread (void)
274     {
275     aio_req req;
276 root 1.26 Newz (0, req, 1, aio_cb);
277 root 1.4 req->type = REQ_QUIT;
278    
279     send_req (req);
280     }
281    
282 root 1.22 static void min_parallel (int nthreads)
283     {
284 root 1.30 if (wanted < nthreads)
285     wanted = nthreads;
286 root 1.22 }
287    
288     static void max_parallel (int nthreads)
289     {
290     int cur = started;
291 root 1.27
292 root 1.30 if (wanted > nthreads)
293     wanted = nthreads;
294    
295     while (cur > wanted)
296     {
297 root 1.22 end_thread ();
298     cur--;
299     }
300    
301 root 1.30 while (started > wanted)
302 root 1.22 {
303     poll_wait ();
304     poll_cb ();
305     }
306     }
307    
308 root 1.26 static void create_pipe ()
309 root 1.22 {
310 root 1.26 if (pipe (respipe))
311     croak ("unable to initialize result pipe");
312 root 1.22
313 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
314     croak ("cannot set result pipe to nonblocking mode");
315 root 1.22
316 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
317     croak ("cannot set result pipe to nonblocking mode");
318     }
319 root 1.22
320 root 1.26 static void atfork_prepare (void)
321     {
322 root 1.28 pthread_mutex_lock (&reqlock);
323 root 1.22 pthread_mutex_lock (&reslock);
324     }
325    
326     static void atfork_parent (void)
327     {
328     pthread_mutex_unlock (&reslock);
329 root 1.26 pthread_mutex_unlock (&reqlock);
330     }
331    
332     static void atfork_child (void)
333     {
334 root 1.28 aio_req prv;
335    
336 root 1.26 started = 0;
337    
338 root 1.28 while (reqs)
339     {
340     prv = reqs;
341     reqs = prv->next;
342     free_req (prv);
343     }
344    
345     reqs = reqe = 0;
346    
347     while (ress)
348     {
349     prv = ress;
350     ress = prv->next;
351     free_req (prv);
352     }
353    
354     ress = rese = 0;
355    
356 root 1.29 close (respipe [0]);
357     close (respipe [1]);
358     create_pipe ();
359    
360 root 1.26 atfork_parent ();
361 root 1.22 }
362    
363     /*****************************************************************************/
364 root 1.17 /* work around various missing functions */
365    
366     #if !HAVE_PREADWRITE
367     # define pread aio_pread
368     # define pwrite aio_pwrite
369    
370     /*
371     * make our pread/pwrite safe against themselves, but not against
372     * normal read/write by using a mutex. slows down execution a lot,
373     * but that's your problem, not mine.
374     */
375     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
376    
377     static ssize_t
378     pread (int fd, void *buf, size_t count, off_t offset)
379     {
380     ssize_t res;
381     off_t ooffset;
382    
383     pthread_mutex_lock (&iolock);
384     ooffset = lseek (fd, 0, SEEK_CUR);
385     lseek (fd, offset, SEEK_SET);
386     res = read (fd, buf, count);
387     lseek (fd, ooffset, SEEK_SET);
388     pthread_mutex_unlock (&iolock);
389    
390     return res;
391     }
392    
393     static ssize_t
394     pwrite (int fd, void *buf, size_t count, off_t offset)
395     {
396     ssize_t res;
397     off_t ooffset;
398    
399     pthread_mutex_lock (&iolock);
400     ooffset = lseek (fd, 0, SEEK_CUR);
401     lseek (fd, offset, SEEK_SET);
402     res = write (fd, buf, count);
403     lseek (fd, offset, SEEK_SET);
404     pthread_mutex_unlock (&iolock);
405    
406     return res;
407     }
408     #endif
409    
410     #if !HAVE_FDATASYNC
411     # define fdatasync fsync
412     #endif
413    
414     #if !HAVE_READAHEAD
415     # define readahead aio_readahead
416    
417     static char readahead_buf[4096];
418    
419     static ssize_t
420     readahead (int fd, off_t offset, size_t count)
421     {
422     while (count > 0)
423     {
424     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
425    
426     pread (fd, readahead_buf, len, offset);
427     offset += len;
428     count -= len;
429     }
430    
431     errno = 0;
432     }
433     #endif
434    
435 root 1.32 /* sendfile always needs emulation */
436     static ssize_t
437     sendfile_ (int ofd, int ifd, off_t offset, size_t count)
438     {
439     ssize_t res;
440    
441     if (!count)
442     return 0;
443    
444     #if __linux
445     res = sendfile (ofd, ifd, &offset, count);
446    
447     #elif __freebsd
448     /*
449     * Of course, the freebsd sendfile is a dire hack with no thoughts
450     * wasted on making it similar to other i/o functions.
451     */
452     {
453     off_t sbytes;
454     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
455    
456     if (!res && errno == EAGAIN)
457 root 1.33 /* maybe on others, too, as usual, the manpage leaves you guessing */
458 root 1.32 res = sbytes;
459     }
460    
461     #elif __hpux
462     res = sendfile (ofd, ifd, offset, count, 0, 0);
463    
464     #else
465     res = -1;
466     errno = ENOSYS;
467     #endif
468    
469     if (res < 0 && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK))
470     {
471     /* emulate sendfile. this is a major pain in the ass */
472     char *buf = malloc (4096);
473     res = 0;
474    
475     for (;;)
476     {
477     ssize_t cnt;
478    
479     cnt = pread (ifd, buf, 4096, offset);
480    
481     if (cnt <= 0)
482     {
483     if (cnt && !res) res = -1;
484     break;
485     }
486    
487     cnt = write (ofd, buf, cnt);
488    
489     if (cnt <= 0)
490     {
491     if (cnt && !res) res = -1;
492     break;
493     }
494    
495     offset += cnt;
496     res += cnt;
497     }
498    
499     {
500     int errorno = errno;
501     free (buf);
502     errno = errorno;
503     }
504     }
505    
506     return res;
507     }
508    
509 root 1.22 /*****************************************************************************/
510    
511 root 1.1 static void *
512     aio_proc (void *thr_arg)
513     {
514     aio_req req;
515 root 1.3 int type;
516 root 1.1
517 root 1.3 do
518 root 1.1 {
519 root 1.3 pthread_mutex_lock (&reqlock);
520    
521     for (;;)
522     {
523     req = reqs;
524    
525     if (reqs)
526     {
527     reqs = reqs->next;
528     if (!reqs) reqe = 0;
529     }
530    
531     if (req)
532     break;
533    
534     pthread_cond_wait (&reqwait, &reqlock);
535     }
536    
537     pthread_mutex_unlock (&reqlock);
538    
539 root 1.1 errno = 0; /* strictly unnecessary */
540    
541 root 1.3 type = req->type;
542    
543     switch (type)
544 root 1.1 {
545 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
546     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
547 root 1.16
548 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
549 root 1.32 case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
550 root 1.1
551     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
552     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
553     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
554    
555     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
556     case REQ_CLOSE: req->result = close (req->fd); break;
557     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
558 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
559     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
560 root 1.1
561 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
562 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
563    
564     case REQ_QUIT:
565 root 1.3 break;
566 root 1.1
567     default:
568     req->result = ENOSYS;
569     break;
570     }
571    
572     req->errorno = errno;
573 root 1.3
574     pthread_mutex_lock (&reslock);
575    
576     req->next = 0;
577    
578     if (rese)
579     {
580     rese->next = req;
581     rese = req;
582     }
583     else
584     {
585     rese = ress = req;
586    
587     /* write a dummy byte to the pipe so fh becomes ready */
588     write (respipe [1], &respipe, 1);
589     }
590    
591     pthread_mutex_unlock (&reslock);
592 root 1.1 }
593 root 1.3 while (type != REQ_QUIT);
594 root 1.1
595     return 0;
596     }
597    
598 root 1.22 #define dREQ \
599     aio_req req; \
600     \
601     if (SvOK (callback) && !SvROK (callback)) \
602     croak ("clalback must be undef or of reference type"); \
603     \
604     Newz (0, req, 1, aio_cb); \
605     if (!req) \
606     croak ("out of memory during aio_req allocation"); \
607     \
608 root 1.27 req->callback = newSVsv (callback);
609 root 1.22
610 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
611    
612 root 1.8 PROTOTYPES: ENABLE
613    
614 root 1.1 BOOT:
615     {
616 root 1.26 create_pipe ();
617 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
618 root 1.1 }
619    
620     void
621     min_parallel(nthreads)
622     int nthreads
623     PROTOTYPE: $
624    
625     void
626     max_parallel(nthreads)
627     int nthreads
628     PROTOTYPE: $
629    
630 root 1.4 int
631     max_outstanding(nreqs)
632     int nreqs
633     PROTOTYPE: $
634     CODE:
635     RETVAL = max_outstanding;
636     max_outstanding = nreqs;
637    
638 root 1.1 void
639 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
640 root 1.1 SV * pathname
641     int flags
642     int mode
643     SV * callback
644 root 1.8 PROTOTYPE: $$$;$
645 root 1.1 CODE:
646     {
647 root 1.22 dREQ;
648 root 1.1
649     req->type = REQ_OPEN;
650     req->data = newSVsv (pathname);
651 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
652 root 1.1 req->fd = flags;
653     req->mode = mode;
654    
655     send_req (req);
656     }
657    
658     void
659 root 1.8 aio_close(fh,callback=&PL_sv_undef)
660 root 1.13 SV * fh
661     SV * callback
662 root 1.8 PROTOTYPE: $;$
663 root 1.1 ALIAS:
664     aio_close = REQ_CLOSE
665     aio_fsync = REQ_FSYNC
666     aio_fdatasync = REQ_FDATASYNC
667     CODE:
668     {
669 root 1.22 dREQ;
670 root 1.1
671     req->type = ix;
672 root 1.13 req->fh = newSVsv (fh);
673     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
674 root 1.1
675     send_req (req);
676     }
677    
678     void
679 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
680 root 1.13 SV * fh
681     UV offset
682 root 1.32 UV length
683 root 1.13 SV * data
684 root 1.32 UV dataoffset
685 root 1.13 SV * callback
686     ALIAS:
687     aio_read = REQ_READ
688     aio_write = REQ_WRITE
689 root 1.8 PROTOTYPE: $$$$$;$
690 root 1.1 CODE:
691 root 1.13 {
692     aio_req req;
693     STRLEN svlen;
694 root 1.21 char *svptr = SvPVbyte (data, svlen);
695 root 1.13
696     SvUPGRADE (data, SVt_PV);
697     SvPOK_on (data);
698 root 1.1
699 root 1.13 if (dataoffset < 0)
700     dataoffset += svlen;
701    
702     if (dataoffset < 0 || dataoffset > svlen)
703     croak ("data offset outside of string");
704    
705     if (ix == REQ_WRITE)
706     {
707     /* write: check length and adjust. */
708     if (length < 0 || length + dataoffset > svlen)
709     length = svlen - dataoffset;
710     }
711     else
712     {
713     /* read: grow scalar as necessary */
714     svptr = SvGROW (data, length + dataoffset);
715     }
716    
717     if (length < 0)
718     croak ("length must not be negative");
719    
720 root 1.22 {
721     dREQ;
722 root 1.13
723 root 1.22 req->type = ix;
724     req->fh = newSVsv (fh);
725     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
726     : IoOFP (sv_2io (fh)));
727     req->offset = offset;
728     req->length = length;
729     req->data = SvREFCNT_inc (data);
730     req->dataptr = (char *)svptr + dataoffset;
731 root 1.13
732 root 1.28 if (!SvREADONLY (data))
733     {
734     SvREADONLY_on (data);
735     req->data2ptr = (void *)data;
736     }
737    
738 root 1.22 send_req (req);
739     }
740 root 1.13 }
741 root 1.1
742     void
743 root 1.32 aio_sendfile(out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
744     SV * out_fh
745     SV * in_fh
746     UV in_offset
747     UV length
748     SV * callback
749     PROTOTYPE: $$$$;$
750     CODE:
751     {
752     dREQ;
753    
754     req->type = REQ_SENDFILE;
755     req->fh = newSVsv (out_fh);
756     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
757     req->fh2 = newSVsv (in_fh);
758     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
759     req->offset = in_offset;
760     req->length = length;
761    
762     send_req (req);
763     }
764    
765     void
766 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
767 root 1.13 SV * fh
768     UV offset
769     IV length
770     SV * callback
771 root 1.8 PROTOTYPE: $$$;$
772 root 1.1 CODE:
773     {
774 root 1.22 dREQ;
775 root 1.1
776     req->type = REQ_READAHEAD;
777 root 1.13 req->fh = newSVsv (fh);
778     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
779 root 1.1 req->offset = offset;
780     req->length = length;
781    
782     send_req (req);
783     }
784    
785     void
786 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
787 root 1.1 SV * fh_or_path
788     SV * callback
789     ALIAS:
790 root 1.8 aio_stat = REQ_STAT
791     aio_lstat = REQ_LSTAT
792 root 1.1 CODE:
793     {
794 root 1.22 dREQ;
795 root 1.1
796     New (0, req->statdata, 1, Stat_t);
797     if (!req->statdata)
798 root 1.27 {
799     free_req (req);
800     croak ("out of memory during aio_req->statdata allocation");
801     }
802 root 1.1
803     if (SvPOK (fh_or_path))
804     {
805 root 1.8 req->type = ix;
806 root 1.1 req->data = newSVsv (fh_or_path);
807 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
808 root 1.1 }
809     else
810     {
811     req->type = REQ_FSTAT;
812 root 1.13 req->fh = newSVsv (fh_or_path);
813 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
814     }
815    
816     send_req (req);
817     }
818    
819     void
820 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
821 root 1.1 SV * pathname
822     SV * callback
823 root 1.22 ALIAS:
824     aio_unlink = REQ_UNLINK
825     aio_rmdir = REQ_RMDIR
826 root 1.1 CODE:
827     {
828 root 1.22 dREQ;
829 root 1.1
830 root 1.22 req->type = ix;
831     req->data = newSVsv (pathname);
832     req->dataptr = SvPVbyte_nolen (req->data);
833 root 1.1
834 root 1.22 send_req (req);
835     }
836    
837     void
838     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
839     SV * oldpath
840     SV * newpath
841     SV * callback
842     CODE:
843     {
844     dREQ;
845 root 1.1
846 root 1.22 req->type = REQ_SYMLINK;
847     req->fh = newSVsv (oldpath);
848     req->data2ptr = SvPVbyte_nolen (req->fh);
849     req->data = newSVsv (newpath);
850     req->dataptr = SvPVbyte_nolen (req->data);
851 root 1.1
852     send_req (req);
853     }
854    
855 root 1.6 void
856     flush()
857     PROTOTYPE:
858     CODE:
859     while (nreqs)
860     {
861     poll_wait ();
862     poll_cb ();
863     }
864    
865 root 1.7 void
866     poll()
867     PROTOTYPE:
868     CODE:
869     if (nreqs)
870     {
871     poll_wait ();
872     poll_cb ();
873     }
874    
875 root 1.1 int
876     poll_fileno()
877     PROTOTYPE:
878     CODE:
879 root 1.3 RETVAL = respipe [0];
880 root 1.1 OUTPUT:
881     RETVAL
882    
883     int
884     poll_cb(...)
885     PROTOTYPE:
886     CODE:
887 root 1.5 RETVAL = poll_cb ();
888 root 1.1 OUTPUT:
889     RETVAL
890    
891     void
892     poll_wait()
893     PROTOTYPE:
894     CODE:
895 root 1.3 if (nreqs)
896     poll_wait ();
897 root 1.1
898     int
899     nreqs()
900     PROTOTYPE:
901     CODE:
902     RETVAL = nreqs;
903     OUTPUT:
904     RETVAL
905