ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.41
Committed: Sat Jun 24 19:14:04 2006 UTC (17 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-1_8
Changes since 1.40: +6 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.1 #include <sys/types.h>
15     #include <sys/stat.h>
16 root 1.37 #include <limits.h>
17 root 1.1 #include <unistd.h>
18     #include <fcntl.h>
19     #include <signal.h>
20     #include <sched.h>
21    
22 root 1.32 #if HAVE_SENDFILE
23     # if __linux
24     # include <sys/sendfile.h>
25     # elif __freebsd
26     # include <sys/socket.h>
27     # include <sys/uio.h>
28     # elif __hpux
29     # include <sys/socket.h>
30 root 1.35 # elif __solaris /* not yet */
31     # include <sys/sendfile.h>
32 root 1.34 # else
33     # error sendfile support requested but not available
34 root 1.32 # endif
35     #endif
36 root 1.1
37 root 1.39 /* used for struct dirent, AIX doesn't provide it */
38     #ifndef NAME_MAX
39     # define NAME_MAX 4096
40     #endif
41    
42 root 1.4 #if __ia64
43     # define STACKSIZE 65536
44 root 1.1 #else
45 root 1.37 # define STACKSIZE 8192
46 root 1.1 #endif
47    
48     enum {
49     REQ_QUIT,
50     REQ_OPEN, REQ_CLOSE,
51     REQ_READ, REQ_WRITE, REQ_READAHEAD,
52 root 1.32 REQ_SENDFILE,
53 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
54 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
55 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
56 root 1.37 REQ_READDIR,
57 root 1.40 REQ_LINK, REQ_SYMLINK,
58 root 1.1 };
59    
60     typedef struct aio_cb {
61 root 1.3 struct aio_cb *volatile next;
62 root 1.1
63     int type;
64    
65 root 1.37 /* should receive a cleanup, with unions */
66 root 1.32 int fd, fd2;
67 root 1.1 off_t offset;
68     size_t length;
69     ssize_t result;
70     mode_t mode; /* open */
71     int errorno;
72 root 1.32 SV *data, *callback;
73     SV *fh, *fh2;
74 root 1.22 void *dataptr, *data2ptr;
75 root 1.1 STRLEN dataoffset;
76    
77     Stat_t *statdata;
78     } aio_cb;
79    
80     typedef aio_cb *aio_req;
81    
82 root 1.30 static int started, wanted;
83 root 1.11 static volatile int nreqs;
84 root 1.4 static int max_outstanding = 1<<30;
85 root 1.3 static int respipe [2];
86 root 1.1
87 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
88     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
89     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
90    
91     static volatile aio_req reqs, reqe; /* queue start, queue end */
92     static volatile aio_req ress, rese; /* queue start, queue end */
93 root 1.1
94 root 1.26 static void free_req (aio_req req)
95     {
96     if (req->data)
97     SvREFCNT_dec (req->data);
98    
99     if (req->fh)
100     SvREFCNT_dec (req->fh);
101    
102 root 1.32 if (req->fh2)
103     SvREFCNT_dec (req->fh2);
104    
105 root 1.27 if (req->statdata)
106 root 1.26 Safefree (req->statdata);
107    
108     if (req->callback)
109     SvREFCNT_dec (req->callback);
110    
111 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
112     free (req->data2ptr);
113    
114 root 1.26 Safefree (req);
115     }
116    
117 root 1.1 static void
118     poll_wait ()
119     {
120 root 1.11 if (nreqs && !ress)
121     {
122     fd_set rfd;
123     FD_ZERO(&rfd);
124     FD_SET(respipe [0], &rfd);
125 root 1.3
126 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
127     }
128 root 1.1 }
129    
130     static int
131 root 1.5 poll_cb ()
132 root 1.1 {
133     dSP;
134     int count = 0;
135 root 1.24 int do_croak = 0;
136 root 1.27 aio_req req;
137    
138     for (;;)
139     {
140     pthread_mutex_lock (&reslock);
141     req = ress;
142    
143     if (req)
144     {
145     ress = req->next;
146 root 1.11
147 root 1.27 if (!ress)
148     {
149     /* read any signals sent by the worker threads */
150     char buf [32];
151     while (read (respipe [0], buf, 32) == 32)
152     ;
153 root 1.30
154     rese = 0;
155 root 1.27 }
156     }
157 root 1.1
158 root 1.27 pthread_mutex_unlock (&reslock);
159 root 1.3
160 root 1.27 if (!req)
161     break;
162 root 1.3
163 root 1.1 nreqs--;
164    
165     if (req->type == REQ_QUIT)
166     started--;
167     else
168     {
169     int errorno = errno;
170     errno = req->errorno;
171    
172     if (req->type == REQ_READ)
173 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
174 root 1.1
175 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
176     SvREADONLY_off (req->data);
177    
178 root 1.27 if (req->statdata)
179 root 1.1 {
180     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
181     PL_laststatval = req->result;
182     PL_statcache = *(req->statdata);
183     }
184    
185 root 1.13 ENTER;
186 root 1.1 PUSHMARK (SP);
187 root 1.2
188 root 1.37 if (req->type == REQ_READDIR)
189     {
190     SV *rv = &PL_sv_undef;
191    
192     if (req->result >= 0)
193     {
194     char *buf = req->data2ptr;
195     AV *av = newAV ();
196    
197     while (req->result)
198     {
199     SV *sv = newSVpv (buf, 0);
200    
201     av_push (av, sv);
202     buf += SvCUR (sv) + 1;
203     req->result--;
204     }
205    
206     rv = sv_2mortal (newRV_noinc ((SV *)av));
207     }
208    
209     XPUSHs (rv);
210     }
211     else
212 root 1.2 {
213 root 1.37 XPUSHs (sv_2mortal (newSViv (req->result)));
214    
215     if (req->type == REQ_OPEN)
216     {
217     /* convert fd to fh */
218     SV *fh;
219 root 1.2
220 root 1.37 PUTBACK;
221     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
222     SPAGAIN;
223 root 1.2
224 root 1.37 fh = SvREFCNT_inc (POPs);
225 root 1.2
226 root 1.37 PUSHMARK (SP);
227     XPUSHs (sv_2mortal (fh));
228     }
229 root 1.2 }
230    
231 root 1.8 if (SvOK (req->callback))
232     {
233     PUTBACK;
234     call_sv (req->callback, G_VOID | G_EVAL);
235     SPAGAIN;
236 root 1.27
237     if (SvTRUE (ERRSV))
238     {
239     free_req (req);
240     croak (0);
241     }
242 root 1.8 }
243 root 1.13
244 root 1.26 LEAVE;
245    
246 root 1.1 errno = errorno;
247     count++;
248     }
249    
250 root 1.27 free_req (req);
251 root 1.1 }
252    
253     return count;
254     }
255    
256 root 1.4 static void *aio_proc(void *arg);
257    
258     static void
259     start_thread (void)
260     {
261     sigset_t fullsigset, oldsigset;
262     pthread_t tid;
263     pthread_attr_t attr;
264    
265     pthread_attr_init (&attr);
266     pthread_attr_setstacksize (&attr, STACKSIZE);
267 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
268 root 1.4
269     sigfillset (&fullsigset);
270     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
271    
272     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
273     started++;
274    
275     sigprocmask (SIG_SETMASK, &oldsigset, 0);
276     }
277    
278     static void
279     send_req (aio_req req)
280     {
281 root 1.30 while (started < wanted && nreqs >= started)
282     start_thread ();
283    
284 root 1.4 nreqs++;
285    
286     pthread_mutex_lock (&reqlock);
287    
288     req->next = 0;
289    
290     if (reqe)
291     {
292     reqe->next = req;
293     reqe = req;
294     }
295     else
296     reqe = reqs = req;
297    
298     pthread_cond_signal (&reqwait);
299     pthread_mutex_unlock (&reqlock);
300    
301 root 1.27 if (nreqs > max_outstanding)
302     for (;;)
303     {
304     poll_cb ();
305    
306     if (nreqs <= max_outstanding)
307     break;
308    
309     poll_wait ();
310     }
311 root 1.4 }
312    
313     static void
314     end_thread (void)
315     {
316     aio_req req;
317 root 1.26 Newz (0, req, 1, aio_cb);
318 root 1.4 req->type = REQ_QUIT;
319    
320     send_req (req);
321     }
322    
323 root 1.22 static void min_parallel (int nthreads)
324     {
325 root 1.30 if (wanted < nthreads)
326     wanted = nthreads;
327 root 1.22 }
328    
329     static void max_parallel (int nthreads)
330     {
331     int cur = started;
332 root 1.27
333 root 1.30 if (wanted > nthreads)
334     wanted = nthreads;
335    
336     while (cur > wanted)
337     {
338 root 1.22 end_thread ();
339     cur--;
340     }
341    
342 root 1.30 while (started > wanted)
343 root 1.22 {
344     poll_wait ();
345     poll_cb ();
346     }
347     }
348    
349 root 1.26 static void create_pipe ()
350 root 1.22 {
351 root 1.26 if (pipe (respipe))
352     croak ("unable to initialize result pipe");
353 root 1.22
354 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
355     croak ("cannot set result pipe to nonblocking mode");
356 root 1.22
357 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
358     croak ("cannot set result pipe to nonblocking mode");
359     }
360 root 1.22
361     /*****************************************************************************/
362 root 1.17 /* work around various missing functions */
363    
364     #if !HAVE_PREADWRITE
365     # define pread aio_pread
366     # define pwrite aio_pwrite
367    
368     /*
369     * make our pread/pwrite safe against themselves, but not against
370     * normal read/write by using a mutex. slows down execution a lot,
371     * but that's your problem, not mine.
372     */
373 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
374 root 1.17
375     static ssize_t
376     pread (int fd, void *buf, size_t count, off_t offset)
377     {
378     ssize_t res;
379     off_t ooffset;
380    
381 root 1.37 pthread_mutex_lock (&preadwritelock);
382 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
383     lseek (fd, offset, SEEK_SET);
384     res = read (fd, buf, count);
385     lseek (fd, ooffset, SEEK_SET);
386 root 1.37 pthread_mutex_unlock (&preadwritelock);
387 root 1.17
388     return res;
389     }
390    
391     static ssize_t
392     pwrite (int fd, void *buf, size_t count, off_t offset)
393     {
394     ssize_t res;
395     off_t ooffset;
396    
397 root 1.37 pthread_mutex_lock (&preadwritelock);
398 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
399     lseek (fd, offset, SEEK_SET);
400     res = write (fd, buf, count);
401     lseek (fd, offset, SEEK_SET);
402 root 1.37 pthread_mutex_unlock (&preadwritelock);
403 root 1.17
404     return res;
405     }
406     #endif
407    
408     #if !HAVE_FDATASYNC
409     # define fdatasync fsync
410     #endif
411    
412     #if !HAVE_READAHEAD
413     # define readahead aio_readahead
414    
415     static ssize_t
416     readahead (int fd, off_t offset, size_t count)
417     {
418 root 1.37 char readahead_buf[4096];
419    
420 root 1.17 while (count > 0)
421     {
422     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
423    
424     pread (fd, readahead_buf, len, offset);
425     offset += len;
426     count -= len;
427     }
428    
429     errno = 0;
430     }
431     #endif
432    
433 root 1.37 #if !HAVE_READDIR_R
434     # define readdir_r aio_readdir_r
435    
436     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
437    
438     static int
439     readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
440     {
441     struct dirent *e;
442     int errorno;
443    
444     pthread_mutex_lock (&readdirlock);
445    
446     e = readdir (dirp);
447     errorno = errno;
448    
449     if (e)
450     {
451     *res = ent;
452     strcpy (ent->d_name, e->d_name);
453     }
454     else
455     *res = 0;
456    
457     pthread_mutex_unlock (&readdirlock);
458    
459     errno = errorno;
460     return e ? 0 : -1;
461     }
462     #endif
463    
464 root 1.32 /* sendfile always needs emulation */
465     static ssize_t
466     sendfile_ (int ofd, int ifd, off_t offset, size_t count)
467     {
468 root 1.37 ssize_t res;
469 root 1.32
470 root 1.37 if (!count)
471     return 0;
472 root 1.32
473 root 1.35 #if HAVE_SENDFILE
474     # if __linux
475 root 1.37 res = sendfile (ofd, ifd, &offset, count);
476 root 1.32
477 root 1.35 # elif __freebsd
478 root 1.37 /*
479     * Of course, the freebsd sendfile is a dire hack with no thoughts
480     * wasted on making it similar to other I/O functions.
481     */
482     {
483     off_t sbytes;
484     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
485    
486     if (res < 0 && sbytes)
487     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
488     res = sbytes;
489     }
490 root 1.32
491 root 1.35 # elif __hpux
492 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
493 root 1.32
494 root 1.35 # elif __solaris
495 root 1.37 {
496     struct sendfilevec vec;
497     size_t sbytes;
498    
499     vec.sfv_fd = ifd;
500     vec.sfv_flag = 0;
501     vec.sfv_off = offset;
502     vec.sfv_len = count;
503    
504     res = sendfilev (ofd, &vec, 1, &sbytes);
505    
506     if (res < 0 && sbytes)
507     res = sbytes;
508     }
509 root 1.35
510 root 1.38 # endif
511     #else
512 root 1.37 res = -1;
513     errno = ENOSYS;
514 root 1.32 #endif
515    
516 root 1.37 if (res < 0
517     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
518 root 1.35 #if __solaris
519 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
520 root 1.35 #endif
521 root 1.37 )
522     )
523     {
524     /* emulate sendfile. this is a major pain in the ass */
525     char buf[4096];
526     res = 0;
527    
528 root 1.38 while (count)
529 root 1.37 {
530     ssize_t cnt;
531    
532 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
533 root 1.32
534 root 1.37 if (cnt <= 0)
535     {
536     if (cnt && !res) res = -1;
537     break;
538     }
539    
540     cnt = write (ofd, buf, cnt);
541    
542     if (cnt <= 0)
543     {
544     if (cnt && !res) res = -1;
545     break;
546     }
547    
548     offset += cnt;
549     res += cnt;
550 root 1.38 count -= cnt;
551 root 1.37 }
552     }
553    
554     return res;
555     }
556    
557     /* read a full directory */
558     static int
559     scandir_ (const char *path, void **namesp)
560     {
561     DIR *dirp = opendir (path);
562     union
563     {
564     struct dirent d;
565     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
566     } u;
567     struct dirent *entp;
568     char *name, *names;
569     int memlen = 4096;
570     int memofs = 0;
571     int res = 0;
572     int errorno;
573    
574     if (!dirp)
575     return -1;
576    
577     names = malloc (memlen);
578    
579     for (;;)
580     {
581     errno = 0, readdir_r (dirp, &u.d, &entp);
582    
583     if (!entp)
584     break;
585    
586     name = entp->d_name;
587    
588     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
589     {
590     int len = strlen (name) + 1;
591    
592     res++;
593    
594     while (memofs + len > memlen)
595     {
596     memlen *= 2;
597     names = realloc (names, memlen);
598     if (!names)
599     break;
600     }
601    
602     memcpy (names + memofs, name, len);
603     memofs += len;
604     }
605     }
606    
607     errorno = errno;
608     closedir (dirp);
609    
610     if (errorno)
611     {
612     free (names);
613     errno = errorno;
614     res = -1;
615     }
616    
617     *namesp = (void *)names;
618     return res;
619 root 1.32 }
620    
621 root 1.22 /*****************************************************************************/
622    
623 root 1.1 static void *
624     aio_proc (void *thr_arg)
625     {
626     aio_req req;
627 root 1.3 int type;
628 root 1.1
629 root 1.3 do
630 root 1.1 {
631 root 1.3 pthread_mutex_lock (&reqlock);
632    
633     for (;;)
634     {
635     req = reqs;
636    
637     if (reqs)
638     {
639     reqs = reqs->next;
640     if (!reqs) reqe = 0;
641     }
642    
643     if (req)
644     break;
645    
646     pthread_cond_wait (&reqwait, &reqlock);
647     }
648    
649     pthread_mutex_unlock (&reqlock);
650    
651 root 1.1 errno = 0; /* strictly unnecessary */
652    
653 root 1.3 type = req->type;
654    
655     switch (type)
656 root 1.1 {
657 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
658     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
659 root 1.16
660 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
661 root 1.32 case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
662 root 1.1
663     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
664     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
665     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
666    
667     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
668     case REQ_CLOSE: req->result = close (req->fd); break;
669     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
670 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
671 root 1.40 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
672     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
673 root 1.22 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
674 root 1.1
675 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
676 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
677 root 1.37 case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
678 root 1.1
679     case REQ_QUIT:
680 root 1.3 break;
681 root 1.1
682     default:
683     req->result = ENOSYS;
684     break;
685     }
686    
687     req->errorno = errno;
688 root 1.3
689     pthread_mutex_lock (&reslock);
690    
691     req->next = 0;
692    
693     if (rese)
694     {
695     rese->next = req;
696     rese = req;
697     }
698     else
699     {
700     rese = ress = req;
701    
702     /* write a dummy byte to the pipe so fh becomes ready */
703     write (respipe [1], &respipe, 1);
704     }
705    
706     pthread_mutex_unlock (&reslock);
707 root 1.1 }
708 root 1.3 while (type != REQ_QUIT);
709 root 1.1
710     return 0;
711     }
712    
713 root 1.37 /*****************************************************************************/
714    
715     static void atfork_prepare (void)
716     {
717     pthread_mutex_lock (&reqlock);
718     pthread_mutex_lock (&reslock);
719     #if !HAVE_PREADWRITE
720     pthread_mutex_lock (&preadwritelock);
721     #endif
722     #if !HAVE_READDIR_R
723     pthread_mutex_lock (&readdirlock);
724     #endif
725     }
726    
727     static void atfork_parent (void)
728     {
729     #if !HAVE_READDIR_R
730     pthread_mutex_unlock (&readdirlock);
731     #endif
732     #if !HAVE_PREADWRITE
733     pthread_mutex_unlock (&preadwritelock);
734     #endif
735     pthread_mutex_unlock (&reslock);
736     pthread_mutex_unlock (&reqlock);
737     }
738    
739     static void atfork_child (void)
740     {
741     aio_req prv;
742    
743     started = 0;
744    
745     while (reqs)
746     {
747     prv = reqs;
748     reqs = prv->next;
749     free_req (prv);
750     }
751    
752     reqs = reqe = 0;
753    
754     while (ress)
755     {
756     prv = ress;
757     ress = prv->next;
758     free_req (prv);
759     }
760    
761     ress = rese = 0;
762    
763     close (respipe [0]);
764     close (respipe [1]);
765     create_pipe ();
766    
767     atfork_parent ();
768     }
769    
770 root 1.22 #define dREQ \
771     aio_req req; \
772     \
773     if (SvOK (callback) && !SvROK (callback)) \
774     croak ("clalback must be undef or of reference type"); \
775     \
776     Newz (0, req, 1, aio_cb); \
777     if (!req) \
778     croak ("out of memory during aio_req allocation"); \
779     \
780 root 1.27 req->callback = newSVsv (callback);
781 root 1.22
782 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
783    
784 root 1.8 PROTOTYPES: ENABLE
785    
786 root 1.1 BOOT:
787     {
788 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
789     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
790     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
791     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
792    
793 root 1.26 create_pipe ();
794 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
795 root 1.1 }
796    
797     void
798 root 1.40 min_parallel (nthreads)
799 root 1.1 int nthreads
800     PROTOTYPE: $
801    
802     void
803 root 1.40 max_parallel (nthreads)
804 root 1.1 int nthreads
805     PROTOTYPE: $
806    
807 root 1.4 int
808 root 1.40 max_outstanding (nreqs)
809 root 1.4 int nreqs
810     PROTOTYPE: $
811     CODE:
812     RETVAL = max_outstanding;
813     max_outstanding = nreqs;
814    
815 root 1.1 void
816 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
817 root 1.1 SV * pathname
818     int flags
819     int mode
820     SV * callback
821 root 1.8 PROTOTYPE: $$$;$
822 root 1.1 CODE:
823     {
824 root 1.22 dREQ;
825 root 1.1
826     req->type = REQ_OPEN;
827     req->data = newSVsv (pathname);
828 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
829 root 1.1 req->fd = flags;
830     req->mode = mode;
831    
832     send_req (req);
833     }
834    
835     void
836 root 1.40 aio_close (fh,callback=&PL_sv_undef)
837 root 1.13 SV * fh
838     SV * callback
839 root 1.8 PROTOTYPE: $;$
840 root 1.1 ALIAS:
841     aio_close = REQ_CLOSE
842     aio_fsync = REQ_FSYNC
843     aio_fdatasync = REQ_FDATASYNC
844     CODE:
845     {
846 root 1.22 dREQ;
847 root 1.1
848     req->type = ix;
849 root 1.13 req->fh = newSVsv (fh);
850     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
851 root 1.1
852     send_req (req);
853     }
854    
855     void
856 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
857 root 1.13 SV * fh
858     UV offset
859 root 1.32 UV length
860 root 1.13 SV * data
861 root 1.32 UV dataoffset
862 root 1.13 SV * callback
863     ALIAS:
864     aio_read = REQ_READ
865     aio_write = REQ_WRITE
866 root 1.8 PROTOTYPE: $$$$$;$
867 root 1.1 CODE:
868 root 1.13 {
869     aio_req req;
870     STRLEN svlen;
871 root 1.21 char *svptr = SvPVbyte (data, svlen);
872 root 1.13
873     SvUPGRADE (data, SVt_PV);
874     SvPOK_on (data);
875 root 1.1
876 root 1.13 if (dataoffset < 0)
877     dataoffset += svlen;
878    
879     if (dataoffset < 0 || dataoffset > svlen)
880     croak ("data offset outside of string");
881    
882     if (ix == REQ_WRITE)
883     {
884     /* write: check length and adjust. */
885     if (length < 0 || length + dataoffset > svlen)
886     length = svlen - dataoffset;
887     }
888     else
889     {
890     /* read: grow scalar as necessary */
891     svptr = SvGROW (data, length + dataoffset);
892     }
893    
894     if (length < 0)
895     croak ("length must not be negative");
896    
897 root 1.22 {
898     dREQ;
899 root 1.13
900 root 1.22 req->type = ix;
901     req->fh = newSVsv (fh);
902     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
903     : IoOFP (sv_2io (fh)));
904     req->offset = offset;
905     req->length = length;
906     req->data = SvREFCNT_inc (data);
907     req->dataptr = (char *)svptr + dataoffset;
908 root 1.13
909 root 1.28 if (!SvREADONLY (data))
910     {
911     SvREADONLY_on (data);
912     req->data2ptr = (void *)data;
913     }
914    
915 root 1.22 send_req (req);
916     }
917 root 1.13 }
918 root 1.1
919     void
920 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
921 root 1.32 SV * out_fh
922     SV * in_fh
923     UV in_offset
924     UV length
925     SV * callback
926     PROTOTYPE: $$$$;$
927     CODE:
928     {
929     dREQ;
930    
931     req->type = REQ_SENDFILE;
932     req->fh = newSVsv (out_fh);
933     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
934     req->fh2 = newSVsv (in_fh);
935     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
936     req->offset = in_offset;
937     req->length = length;
938    
939     send_req (req);
940     }
941    
942     void
943 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
944 root 1.13 SV * fh
945     UV offset
946     IV length
947     SV * callback
948 root 1.8 PROTOTYPE: $$$;$
949 root 1.1 CODE:
950     {
951 root 1.22 dREQ;
952 root 1.1
953     req->type = REQ_READAHEAD;
954 root 1.13 req->fh = newSVsv (fh);
955     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
956 root 1.1 req->offset = offset;
957     req->length = length;
958    
959     send_req (req);
960     }
961    
962     void
963 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
964 root 1.1 SV * fh_or_path
965     SV * callback
966     ALIAS:
967 root 1.8 aio_stat = REQ_STAT
968     aio_lstat = REQ_LSTAT
969 root 1.1 CODE:
970     {
971 root 1.22 dREQ;
972 root 1.1
973     New (0, req->statdata, 1, Stat_t);
974     if (!req->statdata)
975 root 1.27 {
976     free_req (req);
977     croak ("out of memory during aio_req->statdata allocation");
978     }
979 root 1.1
980     if (SvPOK (fh_or_path))
981     {
982 root 1.8 req->type = ix;
983 root 1.1 req->data = newSVsv (fh_or_path);
984 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
985 root 1.1 }
986     else
987     {
988     req->type = REQ_FSTAT;
989 root 1.13 req->fh = newSVsv (fh_or_path);
990 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
991     }
992    
993     send_req (req);
994     }
995    
996     void
997 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
998 root 1.1 SV * pathname
999     SV * callback
1000 root 1.22 ALIAS:
1001 root 1.40 aio_unlink = REQ_UNLINK
1002     aio_rmdir = REQ_RMDIR
1003     aio_readdir = REQ_READDIR
1004 root 1.1 CODE:
1005     {
1006 root 1.22 dREQ;
1007 root 1.1
1008 root 1.22 req->type = ix;
1009     req->data = newSVsv (pathname);
1010     req->dataptr = SvPVbyte_nolen (req->data);
1011 root 1.1
1012 root 1.22 send_req (req);
1013     }
1014    
1015     void
1016 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1017 root 1.22 SV * oldpath
1018     SV * newpath
1019     SV * callback
1020 root 1.40 ALIAS:
1021     aio_link = REQ_LINK
1022     aio_symlink = REQ_SYMLINK
1023     aio_rename = REQ_RENAME
1024 root 1.22 CODE:
1025     {
1026     dREQ;
1027 root 1.1
1028 root 1.40 req->type = ix;
1029 root 1.22 req->fh = newSVsv (oldpath);
1030     req->data2ptr = SvPVbyte_nolen (req->fh);
1031     req->data = newSVsv (newpath);
1032     req->dataptr = SvPVbyte_nolen (req->data);
1033 root 1.1
1034     send_req (req);
1035     }
1036    
1037 root 1.6 void
1038 root 1.40 flush ()
1039 root 1.6 PROTOTYPE:
1040     CODE:
1041     while (nreqs)
1042     {
1043     poll_wait ();
1044     poll_cb ();
1045     }
1046    
1047 root 1.7 void
1048     poll()
1049     PROTOTYPE:
1050     CODE:
1051     if (nreqs)
1052     {
1053     poll_wait ();
1054     poll_cb ();
1055     }
1056    
1057 root 1.1 int
1058     poll_fileno()
1059     PROTOTYPE:
1060     CODE:
1061 root 1.3 RETVAL = respipe [0];
1062 root 1.1 OUTPUT:
1063     RETVAL
1064    
1065     int
1066     poll_cb(...)
1067     PROTOTYPE:
1068     CODE:
1069 root 1.5 RETVAL = poll_cb ();
1070 root 1.1 OUTPUT:
1071     RETVAL
1072    
1073     void
1074     poll_wait()
1075     PROTOTYPE:
1076     CODE:
1077 root 1.3 if (nreqs)
1078     poll_wait ();
1079 root 1.1
1080     int
1081     nreqs()
1082     PROTOTYPE:
1083     CODE:
1084     RETVAL = nreqs;
1085     OUTPUT:
1086     RETVAL
1087