ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.47
Committed: Sun Oct 22 10:10:23 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.46: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.45 #include <sys/time.h>
15     #include <sys/select.h>
16 root 1.1 #include <sys/types.h>
17     #include <sys/stat.h>
18 root 1.37 #include <limits.h>
19 root 1.1 #include <unistd.h>
20     #include <fcntl.h>
21     #include <signal.h>
22     #include <sched.h>
23    
24 root 1.32 #if HAVE_SENDFILE
25     # if __linux
26     # include <sys/sendfile.h>
27     # elif __freebsd
28     # include <sys/socket.h>
29     # include <sys/uio.h>
30     # elif __hpux
31     # include <sys/socket.h>
32 root 1.35 # elif __solaris /* not yet */
33     # include <sys/sendfile.h>
34 root 1.34 # else
35     # error sendfile support requested but not available
36 root 1.32 # endif
37     #endif
38 root 1.1
39 root 1.39 /* used for struct dirent, AIX doesn't provide it */
40     #ifndef NAME_MAX
41     # define NAME_MAX 4096
42     #endif
43    
44 root 1.4 #if __ia64
45     # define STACKSIZE 65536
46 root 1.1 #else
47 root 1.37 # define STACKSIZE 8192
48 root 1.1 #endif
49    
50     enum {
51     REQ_QUIT,
52     REQ_OPEN, REQ_CLOSE,
53     REQ_READ, REQ_WRITE, REQ_READAHEAD,
54 root 1.32 REQ_SENDFILE,
55 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
57 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 root 1.37 REQ_READDIR,
59 root 1.40 REQ_LINK, REQ_SYMLINK,
60 root 1.45 REQ_SLEEP,
61 root 1.44 REQ_GROUP,
62 root 1.1 };
63    
64 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
65     #define AIO_GRP_KLASS "IO::AIO::GRP"
66 root 1.43
67     typedef struct aio_cb
68     {
69 root 1.44 struct aio_cb *grp, *grp_prev, *grp_next;
70 root 1.43
71 root 1.3 struct aio_cb *volatile next;
72 root 1.1
73 root 1.43 SV *self; /* the perl counterpart of this request, if any */
74 root 1.1
75 root 1.43 SV *data, *callback;
76     SV *fh, *fh2;
77     void *dataptr, *data2ptr;
78     Stat_t *statdata;
79 root 1.1 off_t offset;
80     size_t length;
81     ssize_t result;
82 root 1.43
83     int type;
84     int fd, fd2;
85 root 1.1 int errorno;
86     STRLEN dataoffset;
87 root 1.43 mode_t mode; /* open */
88     unsigned char cancelled;
89 root 1.1 } aio_cb;
90    
91     typedef aio_cb *aio_req;
92 root 1.43 typedef aio_cb *aio_req_ornot;
93 root 1.1
94 root 1.30 static int started, wanted;
95 root 1.11 static volatile int nreqs;
96 root 1.4 static int max_outstanding = 1<<30;
97 root 1.3 static int respipe [2];
98 root 1.1
99 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
100     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
101     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
102    
103     static volatile aio_req reqs, reqe; /* queue start, queue end */
104     static volatile aio_req ress, rese; /* queue start, queue end */
105 root 1.1
106 root 1.45 static void req_free (aio_req req);
107    
108 root 1.43 /* must be called at most once */
109 root 1.44 static SV *req_sv (aio_req req, const char *klass)
110 root 1.43 {
111     req->self = (SV *)newHV ();
112     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
113    
114 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
115 root 1.43 }
116    
117 root 1.45 static aio_req SvAIO_REQ (SV *sv)
118 root 1.26 {
119 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
120     croak ("object of class " AIO_REQ_KLASS " expected");
121 root 1.43
122     MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
123    
124     return mg ? (aio_req)mg->mg_ptr : 0;
125     }
126    
127 root 1.45 static void poll_wait ()
128     {
129     if (nreqs && !ress)
130     {
131     fd_set rfd;
132     FD_ZERO(&rfd);
133     FD_SET(respipe [0], &rfd);
134    
135     select (respipe [0] + 1, &rfd, 0, 0, 0);
136     }
137     }
138    
139     static void req_invoke (aio_req req)
140     {
141     dSP;
142     int errorno = errno;
143    
144     if (req->cancelled || !SvOK (req->callback))
145     return;
146    
147     errno = req->errorno;
148    
149     ENTER;
150     PUSHMARK (SP);
151    
152     switch (req->type)
153     {
154     case REQ_READDIR:
155     {
156     SV *rv = &PL_sv_undef;
157    
158     if (req->result >= 0)
159     {
160     char *buf = req->data2ptr;
161     AV *av = newAV ();
162    
163     while (req->result)
164     {
165     SV *sv = newSVpv (buf, 0);
166    
167     av_push (av, sv);
168     buf += SvCUR (sv) + 1;
169     req->result--;
170     }
171    
172     rv = sv_2mortal (newRV_noinc ((SV *)av));
173     }
174    
175     XPUSHs (rv);
176     }
177     break;
178    
179     case REQ_OPEN:
180     {
181     /* convert fd to fh */
182     SV *fh;
183    
184     XPUSHs (sv_2mortal (newSViv (req->result)));
185     PUTBACK;
186     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
187     SPAGAIN;
188    
189     fh = SvREFCNT_inc (POPs);
190    
191     PUSHMARK (SP);
192     XPUSHs (sv_2mortal (fh));
193     }
194     break;
195    
196     case REQ_SLEEP:
197     case REQ_GROUP:
198     break;
199    
200     default:
201     XPUSHs (sv_2mortal (newSViv (req->result)));
202     break;
203     }
204    
205    
206     PUTBACK;
207     call_sv (req->callback, G_VOID | G_EVAL);
208     SPAGAIN;
209    
210     if (SvTRUE (ERRSV))
211     {
212     req_free (req);
213     croak (0);
214     }
215    
216     LEAVE;
217    
218     errno = errorno;
219     }
220    
221 root 1.43 static void req_free (aio_req req)
222     {
223 root 1.45 if (req->grp)
224     {
225     aio_req grp = req->grp;
226    
227     /* unlink request */
228     req->grp_next->grp_prev = req->grp_prev;
229     req->grp_prev->grp_next = req->grp_next;
230    
231 root 1.46 if (grp->grp_next == grp && grp->fd)
232 root 1.45 {
233     req_invoke (grp);
234     req_free (grp);
235     }
236     }
237    
238 root 1.43 if (req->self)
239     {
240     sv_unmagic (req->self, PERL_MAGIC_ext);
241     SvREFCNT_dec (req->self);
242     }
243    
244 root 1.26 if (req->data)
245     SvREFCNT_dec (req->data);
246    
247     if (req->fh)
248     SvREFCNT_dec (req->fh);
249    
250 root 1.32 if (req->fh2)
251     SvREFCNT_dec (req->fh2);
252    
253 root 1.27 if (req->statdata)
254 root 1.26 Safefree (req->statdata);
255    
256     if (req->callback)
257     SvREFCNT_dec (req->callback);
258    
259 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
260     free (req->data2ptr);
261    
262 root 1.26 Safefree (req);
263     }
264    
265 root 1.45 static void req_cancel (aio_req req)
266 root 1.1 {
267 root 1.45 req->cancelled = 1;
268    
269     if (req->type == REQ_GROUP)
270 root 1.11 {
271 root 1.45 aio_req sub;
272 root 1.3
273 root 1.45 for (sub = req->grp_next; sub != req; sub = sub->grp_next)
274     req_cancel (sub);
275 root 1.11 }
276 root 1.1 }
277    
278 root 1.45 static int poll_cb ()
279 root 1.1 {
280     dSP;
281     int count = 0;
282 root 1.24 int do_croak = 0;
283 root 1.27 aio_req req;
284    
285     for (;;)
286     {
287     pthread_mutex_lock (&reslock);
288     req = ress;
289    
290     if (req)
291     {
292     ress = req->next;
293 root 1.11
294 root 1.27 if (!ress)
295     {
296     /* read any signals sent by the worker threads */
297     char buf [32];
298     while (read (respipe [0], buf, 32) == 32)
299     ;
300 root 1.30
301     rese = 0;
302 root 1.27 }
303     }
304 root 1.1
305 root 1.27 pthread_mutex_unlock (&reslock);
306 root 1.3
307 root 1.27 if (!req)
308     break;
309 root 1.3
310 root 1.1 nreqs--;
311    
312     if (req->type == REQ_QUIT)
313     started--;
314 root 1.45 else if (req->type == REQ_GROUP && req->grp_next != req)
315 root 1.46 {
316     req->fd = 1; /* mark request as delayed */
317     continue;
318     }
319 root 1.1 else
320     {
321     if (req->type == REQ_READ)
322 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
323 root 1.1
324 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
325     SvREADONLY_off (req->data);
326    
327 root 1.27 if (req->statdata)
328 root 1.1 {
329     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
330     PL_laststatval = req->result;
331     PL_statcache = *(req->statdata);
332     }
333    
334 root 1.45 req_invoke (req);
335 root 1.26
336 root 1.1 count++;
337     }
338    
339 root 1.43 req_free (req);
340 root 1.1 }
341    
342     return count;
343     }
344    
345 root 1.4 static void *aio_proc(void *arg);
346    
347 root 1.45 static void start_thread (void)
348 root 1.4 {
349     sigset_t fullsigset, oldsigset;
350     pthread_t tid;
351     pthread_attr_t attr;
352    
353     pthread_attr_init (&attr);
354     pthread_attr_setstacksize (&attr, STACKSIZE);
355 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
356 root 1.4
357     sigfillset (&fullsigset);
358     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
359    
360     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
361     started++;
362    
363     sigprocmask (SIG_SETMASK, &oldsigset, 0);
364     }
365    
366 root 1.45 static void req_send (aio_req req)
367 root 1.4 {
368 root 1.30 while (started < wanted && nreqs >= started)
369     start_thread ();
370    
371 root 1.4 nreqs++;
372    
373     pthread_mutex_lock (&reqlock);
374    
375     req->next = 0;
376    
377     if (reqe)
378     {
379     reqe->next = req;
380     reqe = req;
381     }
382     else
383     reqe = reqs = req;
384    
385     pthread_cond_signal (&reqwait);
386     pthread_mutex_unlock (&reqlock);
387    
388 root 1.27 if (nreqs > max_outstanding)
389     for (;;)
390     {
391     poll_cb ();
392    
393     if (nreqs <= max_outstanding)
394     break;
395    
396     poll_wait ();
397     }
398 root 1.4 }
399    
400 root 1.45 static void end_thread (void)
401 root 1.4 {
402     aio_req req;
403 root 1.26 Newz (0, req, 1, aio_cb);
404 root 1.4 req->type = REQ_QUIT;
405    
406 root 1.43 req_send (req);
407 root 1.4 }
408    
409 root 1.22 static void min_parallel (int nthreads)
410     {
411 root 1.30 if (wanted < nthreads)
412     wanted = nthreads;
413 root 1.22 }
414    
415     static void max_parallel (int nthreads)
416     {
417     int cur = started;
418 root 1.27
419 root 1.30 if (wanted > nthreads)
420     wanted = nthreads;
421    
422     while (cur > wanted)
423     {
424 root 1.22 end_thread ();
425     cur--;
426     }
427    
428 root 1.30 while (started > wanted)
429 root 1.22 {
430     poll_wait ();
431     poll_cb ();
432     }
433     }
434    
435 root 1.26 static void create_pipe ()
436 root 1.22 {
437 root 1.26 if (pipe (respipe))
438     croak ("unable to initialize result pipe");
439 root 1.22
440 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
441     croak ("cannot set result pipe to nonblocking mode");
442 root 1.22
443 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
444     croak ("cannot set result pipe to nonblocking mode");
445     }
446 root 1.22
447     /*****************************************************************************/
448 root 1.17 /* work around various missing functions */
449    
450     #if !HAVE_PREADWRITE
451     # define pread aio_pread
452     # define pwrite aio_pwrite
453    
454     /*
455     * make our pread/pwrite safe against themselves, but not against
456     * normal read/write by using a mutex. slows down execution a lot,
457     * but that's your problem, not mine.
458     */
459 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
460 root 1.17
461 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
462 root 1.17 {
463     ssize_t res;
464     off_t ooffset;
465    
466 root 1.37 pthread_mutex_lock (&preadwritelock);
467 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
468     lseek (fd, offset, SEEK_SET);
469     res = read (fd, buf, count);
470     lseek (fd, ooffset, SEEK_SET);
471 root 1.37 pthread_mutex_unlock (&preadwritelock);
472 root 1.17
473     return res;
474     }
475    
476 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
477 root 1.17 {
478     ssize_t res;
479     off_t ooffset;
480    
481 root 1.37 pthread_mutex_lock (&preadwritelock);
482 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
483     lseek (fd, offset, SEEK_SET);
484     res = write (fd, buf, count);
485     lseek (fd, offset, SEEK_SET);
486 root 1.37 pthread_mutex_unlock (&preadwritelock);
487 root 1.17
488     return res;
489     }
490     #endif
491    
492     #if !HAVE_FDATASYNC
493     # define fdatasync fsync
494     #endif
495    
496     #if !HAVE_READAHEAD
497     # define readahead aio_readahead
498    
499 root 1.45 static ssize_t readahead (int fd, off_t offset, size_t count)
500 root 1.17 {
501 root 1.37 char readahead_buf[4096];
502    
503 root 1.17 while (count > 0)
504     {
505     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
506    
507     pread (fd, readahead_buf, len, offset);
508     offset += len;
509     count -= len;
510     }
511    
512     errno = 0;
513     }
514     #endif
515    
516 root 1.37 #if !HAVE_READDIR_R
517     # define readdir_r aio_readdir_r
518    
519     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
520    
521 root 1.45 static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
522 root 1.37 {
523     struct dirent *e;
524     int errorno;
525    
526     pthread_mutex_lock (&readdirlock);
527    
528     e = readdir (dirp);
529     errorno = errno;
530    
531     if (e)
532     {
533     *res = ent;
534     strcpy (ent->d_name, e->d_name);
535     }
536     else
537     *res = 0;
538    
539     pthread_mutex_unlock (&readdirlock);
540    
541     errno = errorno;
542     return e ? 0 : -1;
543     }
544     #endif
545    
546 root 1.32 /* sendfile always needs emulation */
547 root 1.45 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count)
548 root 1.32 {
549 root 1.37 ssize_t res;
550 root 1.32
551 root 1.37 if (!count)
552     return 0;
553 root 1.32
554 root 1.35 #if HAVE_SENDFILE
555     # if __linux
556 root 1.37 res = sendfile (ofd, ifd, &offset, count);
557 root 1.32
558 root 1.35 # elif __freebsd
559 root 1.37 /*
560     * Of course, the freebsd sendfile is a dire hack with no thoughts
561     * wasted on making it similar to other I/O functions.
562     */
563     {
564     off_t sbytes;
565     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
566    
567     if (res < 0 && sbytes)
568     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
569     res = sbytes;
570     }
571 root 1.32
572 root 1.35 # elif __hpux
573 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
574 root 1.32
575 root 1.35 # elif __solaris
576 root 1.37 {
577     struct sendfilevec vec;
578     size_t sbytes;
579    
580     vec.sfv_fd = ifd;
581     vec.sfv_flag = 0;
582     vec.sfv_off = offset;
583     vec.sfv_len = count;
584    
585     res = sendfilev (ofd, &vec, 1, &sbytes);
586    
587     if (res < 0 && sbytes)
588     res = sbytes;
589     }
590 root 1.35
591 root 1.38 # endif
592     #else
593 root 1.37 res = -1;
594     errno = ENOSYS;
595 root 1.32 #endif
596    
597 root 1.37 if (res < 0
598     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
599 root 1.35 #if __solaris
600 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
601 root 1.35 #endif
602 root 1.37 )
603     )
604     {
605     /* emulate sendfile. this is a major pain in the ass */
606     char buf[4096];
607     res = 0;
608    
609 root 1.38 while (count)
610 root 1.37 {
611     ssize_t cnt;
612    
613 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
614 root 1.32
615 root 1.37 if (cnt <= 0)
616     {
617     if (cnt && !res) res = -1;
618     break;
619     }
620    
621     cnt = write (ofd, buf, cnt);
622    
623     if (cnt <= 0)
624     {
625     if (cnt && !res) res = -1;
626     break;
627     }
628    
629     offset += cnt;
630     res += cnt;
631 root 1.38 count -= cnt;
632 root 1.37 }
633     }
634    
635     return res;
636     }
637    
638     /* read a full directory */
639 root 1.45 static int scandir_ (const char *path, void **namesp)
640 root 1.37 {
641     DIR *dirp = opendir (path);
642     union
643     {
644     struct dirent d;
645     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
646     } u;
647     struct dirent *entp;
648     char *name, *names;
649     int memlen = 4096;
650     int memofs = 0;
651     int res = 0;
652     int errorno;
653    
654     if (!dirp)
655     return -1;
656    
657     names = malloc (memlen);
658    
659     for (;;)
660     {
661     errno = 0, readdir_r (dirp, &u.d, &entp);
662    
663     if (!entp)
664     break;
665    
666     name = entp->d_name;
667    
668     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
669     {
670     int len = strlen (name) + 1;
671    
672     res++;
673    
674     while (memofs + len > memlen)
675     {
676     memlen *= 2;
677     names = realloc (names, memlen);
678     if (!names)
679     break;
680     }
681    
682     memcpy (names + memofs, name, len);
683     memofs += len;
684     }
685     }
686    
687     errorno = errno;
688     closedir (dirp);
689    
690     if (errorno)
691     {
692     free (names);
693     errno = errorno;
694     res = -1;
695     }
696    
697     *namesp = (void *)names;
698     return res;
699 root 1.32 }
700    
701 root 1.22 /*****************************************************************************/
702    
703 root 1.45 static void *aio_proc (void *thr_arg)
704 root 1.1 {
705     aio_req req;
706 root 1.3 int type;
707 root 1.1
708 root 1.3 do
709 root 1.1 {
710 root 1.3 pthread_mutex_lock (&reqlock);
711    
712     for (;;)
713     {
714     req = reqs;
715    
716     if (reqs)
717     {
718     reqs = reqs->next;
719     if (!reqs) reqe = 0;
720     }
721    
722     if (req)
723     break;
724    
725     pthread_cond_wait (&reqwait, &reqlock);
726     }
727    
728     pthread_mutex_unlock (&reqlock);
729    
730 root 1.1 errno = 0; /* strictly unnecessary */
731    
732 root 1.43 if (!req->cancelled)
733     switch (req->type)
734     {
735     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
736     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
737 root 1.3
738 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
739     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
740 root 1.16
741 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
742     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
743     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
744    
745     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
746     case REQ_CLOSE: req->result = close (req->fd); break;
747     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
748     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
749     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
750     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
751     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
752    
753     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
754     case REQ_FSYNC: req->result = fsync (req->fd); break;
755     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
756 root 1.1
757 root 1.45 case REQ_SLEEP:
758     {
759     struct timeval tv;
760    
761     tv.tv_sec = req->fd;
762     tv.tv_usec = req->fd2;
763    
764     req->result = select (0, 0, 0, 0, &tv);
765     }
766    
767 root 1.43 case REQ_QUIT:
768     break;
769 root 1.1
770 root 1.43 default:
771     req->result = ENOSYS;
772     break;
773     }
774 root 1.1
775     req->errorno = errno;
776 root 1.3
777     pthread_mutex_lock (&reslock);
778    
779     req->next = 0;
780    
781     if (rese)
782     {
783     rese->next = req;
784     rese = req;
785     }
786     else
787     {
788     rese = ress = req;
789    
790     /* write a dummy byte to the pipe so fh becomes ready */
791     write (respipe [1], &respipe, 1);
792     }
793    
794     pthread_mutex_unlock (&reslock);
795 root 1.1 }
796 root 1.3 while (type != REQ_QUIT);
797 root 1.1
798     return 0;
799     }
800    
801 root 1.37 /*****************************************************************************/
802    
803     static void atfork_prepare (void)
804     {
805     pthread_mutex_lock (&reqlock);
806     pthread_mutex_lock (&reslock);
807     #if !HAVE_PREADWRITE
808     pthread_mutex_lock (&preadwritelock);
809     #endif
810     #if !HAVE_READDIR_R
811     pthread_mutex_lock (&readdirlock);
812     #endif
813     }
814    
815     static void atfork_parent (void)
816     {
817     #if !HAVE_READDIR_R
818     pthread_mutex_unlock (&readdirlock);
819     #endif
820     #if !HAVE_PREADWRITE
821     pthread_mutex_unlock (&preadwritelock);
822     #endif
823     pthread_mutex_unlock (&reslock);
824     pthread_mutex_unlock (&reqlock);
825     }
826    
827     static void atfork_child (void)
828     {
829     aio_req prv;
830    
831     started = 0;
832    
833     while (reqs)
834     {
835     prv = reqs;
836     reqs = prv->next;
837 root 1.43 req_free (prv);
838 root 1.37 }
839    
840     reqs = reqe = 0;
841    
842     while (ress)
843     {
844     prv = ress;
845     ress = prv->next;
846 root 1.43 req_free (prv);
847 root 1.37 }
848    
849     ress = rese = 0;
850    
851     close (respipe [0]);
852     close (respipe [1]);
853     create_pipe ();
854    
855     atfork_parent ();
856     }
857    
858 root 1.22 #define dREQ \
859     aio_req req; \
860     \
861     if (SvOK (callback) && !SvROK (callback)) \
862 root 1.43 croak ("callback must be undef or of reference type"); \
863 root 1.22 \
864     Newz (0, req, 1, aio_cb); \
865     if (!req) \
866     croak ("out of memory during aio_req allocation"); \
867     \
868 root 1.43 req->callback = newSVsv (callback)
869    
870     #define REQ_SEND \
871     req_send (req); \
872     \
873     if (GIMME_V != G_VOID) \
874 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
875 root 1.22
876 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
877    
878 root 1.8 PROTOTYPES: ENABLE
879    
880 root 1.1 BOOT:
881     {
882 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
883     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
884     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
885     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
886    
887 root 1.26 create_pipe ();
888 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
889 root 1.1 }
890    
891     void
892 root 1.40 min_parallel (nthreads)
893 root 1.1 int nthreads
894     PROTOTYPE: $
895    
896     void
897 root 1.40 max_parallel (nthreads)
898 root 1.1 int nthreads
899     PROTOTYPE: $
900    
901 root 1.4 int
902 root 1.40 max_outstanding (nreqs)
903 root 1.4 int nreqs
904     PROTOTYPE: $
905     CODE:
906     RETVAL = max_outstanding;
907     max_outstanding = nreqs;
908    
909 root 1.1 void
910 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
911 root 1.1 SV * pathname
912     int flags
913     int mode
914     SV * callback
915 root 1.8 PROTOTYPE: $$$;$
916 root 1.43 PPCODE:
917 root 1.1 {
918 root 1.22 dREQ;
919 root 1.1
920     req->type = REQ_OPEN;
921     req->data = newSVsv (pathname);
922 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
923 root 1.1 req->fd = flags;
924     req->mode = mode;
925    
926 root 1.43 REQ_SEND;
927 root 1.1 }
928    
929     void
930 root 1.40 aio_close (fh,callback=&PL_sv_undef)
931 root 1.13 SV * fh
932     SV * callback
933 root 1.8 PROTOTYPE: $;$
934 root 1.1 ALIAS:
935     aio_close = REQ_CLOSE
936     aio_fsync = REQ_FSYNC
937     aio_fdatasync = REQ_FDATASYNC
938 root 1.43 PPCODE:
939 root 1.1 {
940 root 1.22 dREQ;
941 root 1.1
942     req->type = ix;
943 root 1.13 req->fh = newSVsv (fh);
944     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
945 root 1.1
946 root 1.43 REQ_SEND (req);
947 root 1.1 }
948    
949     void
950 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
951 root 1.13 SV * fh
952     UV offset
953 root 1.32 UV length
954 root 1.13 SV * data
955 root 1.32 UV dataoffset
956 root 1.13 SV * callback
957     ALIAS:
958     aio_read = REQ_READ
959     aio_write = REQ_WRITE
960 root 1.8 PROTOTYPE: $$$$$;$
961 root 1.43 PPCODE:
962 root 1.13 {
963     aio_req req;
964     STRLEN svlen;
965 root 1.21 char *svptr = SvPVbyte (data, svlen);
966 root 1.13
967     SvUPGRADE (data, SVt_PV);
968     SvPOK_on (data);
969 root 1.1
970 root 1.13 if (dataoffset < 0)
971     dataoffset += svlen;
972    
973     if (dataoffset < 0 || dataoffset > svlen)
974     croak ("data offset outside of string");
975    
976     if (ix == REQ_WRITE)
977     {
978     /* write: check length and adjust. */
979     if (length < 0 || length + dataoffset > svlen)
980     length = svlen - dataoffset;
981     }
982     else
983     {
984     /* read: grow scalar as necessary */
985     svptr = SvGROW (data, length + dataoffset);
986     }
987    
988     if (length < 0)
989     croak ("length must not be negative");
990    
991 root 1.22 {
992     dREQ;
993 root 1.13
994 root 1.22 req->type = ix;
995     req->fh = newSVsv (fh);
996     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
997     : IoOFP (sv_2io (fh)));
998     req->offset = offset;
999     req->length = length;
1000     req->data = SvREFCNT_inc (data);
1001     req->dataptr = (char *)svptr + dataoffset;
1002 root 1.13
1003 root 1.28 if (!SvREADONLY (data))
1004     {
1005     SvREADONLY_on (data);
1006     req->data2ptr = (void *)data;
1007     }
1008    
1009 root 1.43 REQ_SEND;
1010 root 1.22 }
1011 root 1.13 }
1012 root 1.1
1013     void
1014 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
1015 root 1.32 SV * out_fh
1016     SV * in_fh
1017     UV in_offset
1018     UV length
1019     SV * callback
1020     PROTOTYPE: $$$$;$
1021 root 1.43 PPCODE:
1022 root 1.32 {
1023     dREQ;
1024    
1025     req->type = REQ_SENDFILE;
1026     req->fh = newSVsv (out_fh);
1027     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1028     req->fh2 = newSVsv (in_fh);
1029     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1030     req->offset = in_offset;
1031     req->length = length;
1032    
1033 root 1.43 REQ_SEND;
1034 root 1.32 }
1035    
1036     void
1037 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1038 root 1.13 SV * fh
1039     UV offset
1040     IV length
1041     SV * callback
1042 root 1.8 PROTOTYPE: $$$;$
1043 root 1.43 PPCODE:
1044 root 1.1 {
1045 root 1.22 dREQ;
1046 root 1.1
1047     req->type = REQ_READAHEAD;
1048 root 1.13 req->fh = newSVsv (fh);
1049     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1050 root 1.1 req->offset = offset;
1051     req->length = length;
1052    
1053 root 1.43 REQ_SEND;
1054 root 1.1 }
1055    
1056     void
1057 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1058 root 1.1 SV * fh_or_path
1059     SV * callback
1060     ALIAS:
1061 root 1.8 aio_stat = REQ_STAT
1062     aio_lstat = REQ_LSTAT
1063 root 1.43 PPCODE:
1064 root 1.1 {
1065 root 1.22 dREQ;
1066 root 1.1
1067     New (0, req->statdata, 1, Stat_t);
1068     if (!req->statdata)
1069 root 1.27 {
1070 root 1.43 req_free (req);
1071 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1072     }
1073 root 1.1
1074     if (SvPOK (fh_or_path))
1075     {
1076 root 1.8 req->type = ix;
1077 root 1.1 req->data = newSVsv (fh_or_path);
1078 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1079 root 1.1 }
1080     else
1081     {
1082     req->type = REQ_FSTAT;
1083 root 1.13 req->fh = newSVsv (fh_or_path);
1084 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1085     }
1086    
1087 root 1.43 REQ_SEND;
1088 root 1.1 }
1089    
1090     void
1091 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1092 root 1.1 SV * pathname
1093     SV * callback
1094 root 1.22 ALIAS:
1095 root 1.40 aio_unlink = REQ_UNLINK
1096     aio_rmdir = REQ_RMDIR
1097     aio_readdir = REQ_READDIR
1098 root 1.43 PPCODE:
1099 root 1.1 {
1100 root 1.22 dREQ;
1101 root 1.1
1102 root 1.22 req->type = ix;
1103     req->data = newSVsv (pathname);
1104     req->dataptr = SvPVbyte_nolen (req->data);
1105 root 1.1
1106 root 1.43 REQ_SEND;
1107 root 1.22 }
1108    
1109     void
1110 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1111 root 1.22 SV * oldpath
1112     SV * newpath
1113     SV * callback
1114 root 1.40 ALIAS:
1115     aio_link = REQ_LINK
1116     aio_symlink = REQ_SYMLINK
1117     aio_rename = REQ_RENAME
1118 root 1.43 PPCODE:
1119 root 1.22 {
1120     dREQ;
1121 root 1.1
1122 root 1.40 req->type = ix;
1123 root 1.22 req->fh = newSVsv (oldpath);
1124     req->data2ptr = SvPVbyte_nolen (req->fh);
1125     req->data = newSVsv (newpath);
1126     req->dataptr = SvPVbyte_nolen (req->data);
1127 root 1.1
1128 root 1.43 REQ_SEND;
1129 root 1.1 }
1130    
1131 root 1.42 void
1132 root 1.45 aio_sleep (delay,callback=&PL_sv_undef)
1133     double delay
1134     SV * callback
1135     PPCODE:
1136     {
1137     dREQ;
1138    
1139     req->type = REQ_SLEEP;
1140     req->fd = delay < 0. ? 0 : delay;
1141     req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd);
1142    
1143     REQ_SEND;
1144     }
1145    
1146     void
1147 root 1.44 aio_group (callback=&PL_sv_undef)
1148     SV * callback
1149 root 1.46 PROTOTYPE: ;$
1150 root 1.44 PPCODE:
1151 root 1.42 {
1152 root 1.44 dREQ;
1153     req->type = REQ_GROUP;
1154 root 1.45 req->grp_next = req;
1155     req->grp_prev = req;
1156    
1157     req_send (req);
1158 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1159 root 1.42 }
1160    
1161 root 1.6 void
1162 root 1.40 flush ()
1163 root 1.6 PROTOTYPE:
1164     CODE:
1165     while (nreqs)
1166     {
1167     poll_wait ();
1168     poll_cb ();
1169     }
1170    
1171 root 1.7 void
1172     poll()
1173     PROTOTYPE:
1174     CODE:
1175     if (nreqs)
1176     {
1177     poll_wait ();
1178     poll_cb ();
1179     }
1180    
1181 root 1.1 int
1182     poll_fileno()
1183     PROTOTYPE:
1184     CODE:
1185 root 1.3 RETVAL = respipe [0];
1186 root 1.1 OUTPUT:
1187     RETVAL
1188    
1189     int
1190     poll_cb(...)
1191     PROTOTYPE:
1192     CODE:
1193 root 1.5 RETVAL = poll_cb ();
1194 root 1.1 OUTPUT:
1195     RETVAL
1196    
1197     void
1198     poll_wait()
1199     PROTOTYPE:
1200     CODE:
1201 root 1.3 if (nreqs)
1202     poll_wait ();
1203 root 1.1
1204     int
1205     nreqs()
1206     PROTOTYPE:
1207     CODE:
1208     RETVAL = nreqs;
1209     OUTPUT:
1210     RETVAL
1211    
1212 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1213 root 1.43
1214     void
1215     cancel (aio_req_ornot req)
1216     PROTOTYPE:
1217     CODE:
1218 root 1.45 req_cancel (req);
1219    
1220     MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1221    
1222     void
1223     add (aio_req grp, ...)
1224     PROTOTYPE: $;@
1225     PPCODE:
1226     {
1227     int i;
1228    
1229     for (i = 1; i < items; ++i )
1230     {
1231 root 1.46 if (GIMME_V != G_VOID)
1232     XPUSHs (sv_2mortal (newSVsv (ST (i))));
1233    
1234 root 1.45 aio_req req = SvAIO_REQ (ST (i));
1235    
1236 root 1.46 if (req)
1237     {
1238     req->grp_prev = grp;
1239     req->grp_next = grp->grp_next;
1240     grp->grp_next->grp_prev = req;
1241     grp->grp_next = req;
1242 root 1.45
1243 root 1.46 req->grp = grp;
1244     }
1245 root 1.45 }
1246     }
1247 root 1.43