ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.48
Committed: Sun Oct 22 10:33:19 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.47: +34 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.45 #include <sys/time.h>
15     #include <sys/select.h>
16 root 1.1 #include <sys/types.h>
17     #include <sys/stat.h>
18 root 1.37 #include <limits.h>
19 root 1.1 #include <unistd.h>
20     #include <fcntl.h>
21     #include <signal.h>
22     #include <sched.h>
23    
24 root 1.32 #if HAVE_SENDFILE
25     # if __linux
26     # include <sys/sendfile.h>
27     # elif __freebsd
28     # include <sys/socket.h>
29     # include <sys/uio.h>
30     # elif __hpux
31     # include <sys/socket.h>
32 root 1.35 # elif __solaris /* not yet */
33     # include <sys/sendfile.h>
34 root 1.34 # else
35     # error sendfile support requested but not available
36 root 1.32 # endif
37     #endif
38 root 1.1
39 root 1.39 /* used for struct dirent, AIX doesn't provide it */
40     #ifndef NAME_MAX
41     # define NAME_MAX 4096
42     #endif
43    
44 root 1.4 #if __ia64
45     # define STACKSIZE 65536
46 root 1.1 #else
47 root 1.37 # define STACKSIZE 8192
48 root 1.1 #endif
49    
50     enum {
51     REQ_QUIT,
52     REQ_OPEN, REQ_CLOSE,
53     REQ_READ, REQ_WRITE, REQ_READAHEAD,
54 root 1.32 REQ_SENDFILE,
55 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
57 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 root 1.37 REQ_READDIR,
59 root 1.40 REQ_LINK, REQ_SYMLINK,
60 root 1.45 REQ_SLEEP,
61 root 1.44 REQ_GROUP,
62 root 1.1 };
63    
64 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
65     #define AIO_GRP_KLASS "IO::AIO::GRP"
66 root 1.43
67     typedef struct aio_cb
68     {
69 root 1.44 struct aio_cb *grp, *grp_prev, *grp_next;
70 root 1.43
71 root 1.3 struct aio_cb *volatile next;
72 root 1.1
73 root 1.43 SV *self; /* the perl counterpart of this request, if any */
74 root 1.1
75 root 1.43 SV *data, *callback;
76     SV *fh, *fh2;
77     void *dataptr, *data2ptr;
78     Stat_t *statdata;
79 root 1.1 off_t offset;
80     size_t length;
81     ssize_t result;
82 root 1.43
83     int type;
84     int fd, fd2;
85 root 1.1 int errorno;
86     STRLEN dataoffset;
87 root 1.43 mode_t mode; /* open */
88     unsigned char cancelled;
89 root 1.1 } aio_cb;
90    
91     typedef aio_cb *aio_req;
92 root 1.43 typedef aio_cb *aio_req_ornot;
93 root 1.1
94 root 1.30 static int started, wanted;
95 root 1.11 static volatile int nreqs;
96 root 1.4 static int max_outstanding = 1<<30;
97 root 1.3 static int respipe [2];
98 root 1.1
99 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
100     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
101     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
102    
103     static volatile aio_req reqs, reqe; /* queue start, queue end */
104     static volatile aio_req ress, rese; /* queue start, queue end */
105 root 1.1
106 root 1.45 static void req_free (aio_req req);
107    
108 root 1.43 /* must be called at most once */
109 root 1.44 static SV *req_sv (aio_req req, const char *klass)
110 root 1.43 {
111     req->self = (SV *)newHV ();
112     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
113    
114 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
115 root 1.43 }
116    
117 root 1.45 static aio_req SvAIO_REQ (SV *sv)
118 root 1.26 {
119 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
120     croak ("object of class " AIO_REQ_KLASS " expected");
121 root 1.43
122     MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
123    
124     return mg ? (aio_req)mg->mg_ptr : 0;
125     }
126    
127 root 1.45 static void poll_wait ()
128     {
129     if (nreqs && !ress)
130     {
131     fd_set rfd;
132     FD_ZERO(&rfd);
133     FD_SET(respipe [0], &rfd);
134    
135     select (respipe [0] + 1, &rfd, 0, 0, 0);
136     }
137     }
138    
139     static void req_invoke (aio_req req)
140     {
141     dSP;
142     int errorno = errno;
143    
144     if (req->cancelled || !SvOK (req->callback))
145     return;
146    
147     errno = req->errorno;
148    
149     ENTER;
150     PUSHMARK (SP);
151 root 1.48 EXTEND (SP, 1);
152 root 1.45
153     switch (req->type)
154     {
155     case REQ_READDIR:
156     {
157     SV *rv = &PL_sv_undef;
158    
159     if (req->result >= 0)
160     {
161     char *buf = req->data2ptr;
162     AV *av = newAV ();
163    
164     while (req->result)
165     {
166     SV *sv = newSVpv (buf, 0);
167    
168     av_push (av, sv);
169     buf += SvCUR (sv) + 1;
170     req->result--;
171     }
172    
173     rv = sv_2mortal (newRV_noinc ((SV *)av));
174     }
175    
176 root 1.48 PUSHs (rv);
177 root 1.45 }
178     break;
179    
180     case REQ_OPEN:
181     {
182     /* convert fd to fh */
183     SV *fh;
184    
185 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
186 root 1.45 PUTBACK;
187     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
188     SPAGAIN;
189    
190     fh = SvREFCNT_inc (POPs);
191    
192     PUSHMARK (SP);
193     XPUSHs (sv_2mortal (fh));
194     }
195     break;
196    
197 root 1.48 case REQ_GROUP:
198     if (req->data)
199     {
200     int i;
201     AV *av = (AV *)req->data;
202    
203     EXTEND (SP, AvFILL (av) + 1);
204     for (i = 0; i <= AvFILL (av); ++i)
205     PUSHs (*av_fetch (av, i, 0));
206     }
207     break;
208    
209 root 1.45 case REQ_SLEEP:
210     break;
211    
212     default:
213 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
214 root 1.45 break;
215     }
216    
217    
218     PUTBACK;
219     call_sv (req->callback, G_VOID | G_EVAL);
220     SPAGAIN;
221    
222     if (SvTRUE (ERRSV))
223     {
224     req_free (req);
225     croak (0);
226     }
227    
228     LEAVE;
229    
230     errno = errorno;
231     }
232    
233 root 1.43 static void req_free (aio_req req)
234     {
235 root 1.45 if (req->grp)
236     {
237     aio_req grp = req->grp;
238    
239     /* unlink request */
240     req->grp_next->grp_prev = req->grp_prev;
241     req->grp_prev->grp_next = req->grp_next;
242    
243 root 1.46 if (grp->grp_next == grp && grp->fd)
244 root 1.45 {
245     req_invoke (grp);
246     req_free (grp);
247     }
248     }
249    
250 root 1.43 if (req->self)
251     {
252     sv_unmagic (req->self, PERL_MAGIC_ext);
253     SvREFCNT_dec (req->self);
254     }
255    
256 root 1.26 if (req->data)
257     SvREFCNT_dec (req->data);
258    
259     if (req->fh)
260     SvREFCNT_dec (req->fh);
261    
262 root 1.32 if (req->fh2)
263     SvREFCNT_dec (req->fh2);
264    
265 root 1.27 if (req->statdata)
266 root 1.26 Safefree (req->statdata);
267    
268     if (req->callback)
269     SvREFCNT_dec (req->callback);
270    
271 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
272     free (req->data2ptr);
273    
274 root 1.26 Safefree (req);
275     }
276    
277 root 1.45 static void req_cancel (aio_req req)
278 root 1.1 {
279 root 1.45 req->cancelled = 1;
280    
281     if (req->type == REQ_GROUP)
282 root 1.11 {
283 root 1.45 aio_req sub;
284 root 1.3
285 root 1.45 for (sub = req->grp_next; sub != req; sub = sub->grp_next)
286     req_cancel (sub);
287 root 1.11 }
288 root 1.1 }
289    
290 root 1.45 static int poll_cb ()
291 root 1.1 {
292     dSP;
293     int count = 0;
294 root 1.24 int do_croak = 0;
295 root 1.27 aio_req req;
296    
297     for (;;)
298     {
299     pthread_mutex_lock (&reslock);
300     req = ress;
301    
302     if (req)
303     {
304     ress = req->next;
305 root 1.11
306 root 1.27 if (!ress)
307     {
308     /* read any signals sent by the worker threads */
309     char buf [32];
310     while (read (respipe [0], buf, 32) == 32)
311     ;
312 root 1.30
313     rese = 0;
314 root 1.27 }
315     }
316 root 1.1
317 root 1.27 pthread_mutex_unlock (&reslock);
318 root 1.3
319 root 1.27 if (!req)
320     break;
321 root 1.3
322 root 1.1 nreqs--;
323    
324     if (req->type == REQ_QUIT)
325     started--;
326 root 1.45 else if (req->type == REQ_GROUP && req->grp_next != req)
327 root 1.46 {
328     req->fd = 1; /* mark request as delayed */
329     continue;
330     }
331 root 1.1 else
332     {
333     if (req->type == REQ_READ)
334 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
335 root 1.1
336 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
337     SvREADONLY_off (req->data);
338    
339 root 1.27 if (req->statdata)
340 root 1.1 {
341     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
342     PL_laststatval = req->result;
343     PL_statcache = *(req->statdata);
344     }
345    
346 root 1.45 req_invoke (req);
347 root 1.26
348 root 1.1 count++;
349     }
350    
351 root 1.43 req_free (req);
352 root 1.1 }
353    
354     return count;
355     }
356    
357 root 1.4 static void *aio_proc(void *arg);
358    
359 root 1.45 static void start_thread (void)
360 root 1.4 {
361     sigset_t fullsigset, oldsigset;
362     pthread_t tid;
363     pthread_attr_t attr;
364    
365     pthread_attr_init (&attr);
366     pthread_attr_setstacksize (&attr, STACKSIZE);
367 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
368 root 1.4
369     sigfillset (&fullsigset);
370     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
371    
372     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
373     started++;
374    
375     sigprocmask (SIG_SETMASK, &oldsigset, 0);
376     }
377    
378 root 1.45 static void req_send (aio_req req)
379 root 1.4 {
380 root 1.30 while (started < wanted && nreqs >= started)
381     start_thread ();
382    
383 root 1.4 nreqs++;
384    
385     pthread_mutex_lock (&reqlock);
386    
387     req->next = 0;
388    
389     if (reqe)
390     {
391     reqe->next = req;
392     reqe = req;
393     }
394     else
395     reqe = reqs = req;
396    
397     pthread_cond_signal (&reqwait);
398     pthread_mutex_unlock (&reqlock);
399    
400 root 1.27 if (nreqs > max_outstanding)
401     for (;;)
402     {
403     poll_cb ();
404    
405     if (nreqs <= max_outstanding)
406     break;
407    
408     poll_wait ();
409     }
410 root 1.4 }
411    
412 root 1.45 static void end_thread (void)
413 root 1.4 {
414     aio_req req;
415 root 1.26 Newz (0, req, 1, aio_cb);
416 root 1.4 req->type = REQ_QUIT;
417    
418 root 1.43 req_send (req);
419 root 1.4 }
420    
421 root 1.22 static void min_parallel (int nthreads)
422     {
423 root 1.30 if (wanted < nthreads)
424     wanted = nthreads;
425 root 1.22 }
426    
427     static void max_parallel (int nthreads)
428     {
429     int cur = started;
430 root 1.27
431 root 1.30 if (wanted > nthreads)
432     wanted = nthreads;
433    
434     while (cur > wanted)
435     {
436 root 1.22 end_thread ();
437     cur--;
438     }
439    
440 root 1.30 while (started > wanted)
441 root 1.22 {
442     poll_wait ();
443     poll_cb ();
444     }
445     }
446    
447 root 1.26 static void create_pipe ()
448 root 1.22 {
449 root 1.26 if (pipe (respipe))
450     croak ("unable to initialize result pipe");
451 root 1.22
452 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
453     croak ("cannot set result pipe to nonblocking mode");
454 root 1.22
455 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
456     croak ("cannot set result pipe to nonblocking mode");
457     }
458 root 1.22
459     /*****************************************************************************/
460 root 1.17 /* work around various missing functions */
461    
462     #if !HAVE_PREADWRITE
463     # define pread aio_pread
464     # define pwrite aio_pwrite
465    
466     /*
467     * make our pread/pwrite safe against themselves, but not against
468     * normal read/write by using a mutex. slows down execution a lot,
469     * but that's your problem, not mine.
470     */
471 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
472 root 1.17
473 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
474 root 1.17 {
475     ssize_t res;
476     off_t ooffset;
477    
478 root 1.37 pthread_mutex_lock (&preadwritelock);
479 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
480     lseek (fd, offset, SEEK_SET);
481     res = read (fd, buf, count);
482     lseek (fd, ooffset, SEEK_SET);
483 root 1.37 pthread_mutex_unlock (&preadwritelock);
484 root 1.17
485     return res;
486     }
487    
488 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
489 root 1.17 {
490     ssize_t res;
491     off_t ooffset;
492    
493 root 1.37 pthread_mutex_lock (&preadwritelock);
494 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
495     lseek (fd, offset, SEEK_SET);
496     res = write (fd, buf, count);
497     lseek (fd, offset, SEEK_SET);
498 root 1.37 pthread_mutex_unlock (&preadwritelock);
499 root 1.17
500     return res;
501     }
502     #endif
503    
504     #if !HAVE_FDATASYNC
505     # define fdatasync fsync
506     #endif
507    
508     #if !HAVE_READAHEAD
509     # define readahead aio_readahead
510    
511 root 1.45 static ssize_t readahead (int fd, off_t offset, size_t count)
512 root 1.17 {
513 root 1.37 char readahead_buf[4096];
514    
515 root 1.17 while (count > 0)
516     {
517     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
518    
519     pread (fd, readahead_buf, len, offset);
520     offset += len;
521     count -= len;
522     }
523    
524     errno = 0;
525     }
526     #endif
527    
528 root 1.37 #if !HAVE_READDIR_R
529     # define readdir_r aio_readdir_r
530    
531     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
532    
533 root 1.45 static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
534 root 1.37 {
535     struct dirent *e;
536     int errorno;
537    
538     pthread_mutex_lock (&readdirlock);
539    
540     e = readdir (dirp);
541     errorno = errno;
542    
543     if (e)
544     {
545     *res = ent;
546     strcpy (ent->d_name, e->d_name);
547     }
548     else
549     *res = 0;
550    
551     pthread_mutex_unlock (&readdirlock);
552    
553     errno = errorno;
554     return e ? 0 : -1;
555     }
556     #endif
557    
558 root 1.32 /* sendfile always needs emulation */
559 root 1.45 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count)
560 root 1.32 {
561 root 1.37 ssize_t res;
562 root 1.32
563 root 1.37 if (!count)
564     return 0;
565 root 1.32
566 root 1.35 #if HAVE_SENDFILE
567     # if __linux
568 root 1.37 res = sendfile (ofd, ifd, &offset, count);
569 root 1.32
570 root 1.35 # elif __freebsd
571 root 1.37 /*
572     * Of course, the freebsd sendfile is a dire hack with no thoughts
573     * wasted on making it similar to other I/O functions.
574     */
575     {
576     off_t sbytes;
577     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
578    
579     if (res < 0 && sbytes)
580     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
581     res = sbytes;
582     }
583 root 1.32
584 root 1.35 # elif __hpux
585 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
586 root 1.32
587 root 1.35 # elif __solaris
588 root 1.37 {
589     struct sendfilevec vec;
590     size_t sbytes;
591    
592     vec.sfv_fd = ifd;
593     vec.sfv_flag = 0;
594     vec.sfv_off = offset;
595     vec.sfv_len = count;
596    
597     res = sendfilev (ofd, &vec, 1, &sbytes);
598    
599     if (res < 0 && sbytes)
600     res = sbytes;
601     }
602 root 1.35
603 root 1.38 # endif
604     #else
605 root 1.37 res = -1;
606     errno = ENOSYS;
607 root 1.32 #endif
608    
609 root 1.37 if (res < 0
610     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
611 root 1.35 #if __solaris
612 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
613 root 1.35 #endif
614 root 1.37 )
615     )
616     {
617     /* emulate sendfile. this is a major pain in the ass */
618     char buf[4096];
619     res = 0;
620    
621 root 1.38 while (count)
622 root 1.37 {
623     ssize_t cnt;
624    
625 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
626 root 1.32
627 root 1.37 if (cnt <= 0)
628     {
629     if (cnt && !res) res = -1;
630     break;
631     }
632    
633     cnt = write (ofd, buf, cnt);
634    
635     if (cnt <= 0)
636     {
637     if (cnt && !res) res = -1;
638     break;
639     }
640    
641     offset += cnt;
642     res += cnt;
643 root 1.38 count -= cnt;
644 root 1.37 }
645     }
646    
647     return res;
648     }
649    
650     /* read a full directory */
651 root 1.45 static int scandir_ (const char *path, void **namesp)
652 root 1.37 {
653     DIR *dirp = opendir (path);
654     union
655     {
656     struct dirent d;
657     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
658     } u;
659     struct dirent *entp;
660     char *name, *names;
661     int memlen = 4096;
662     int memofs = 0;
663     int res = 0;
664     int errorno;
665    
666     if (!dirp)
667     return -1;
668    
669     names = malloc (memlen);
670    
671     for (;;)
672     {
673     errno = 0, readdir_r (dirp, &u.d, &entp);
674    
675     if (!entp)
676     break;
677    
678     name = entp->d_name;
679    
680     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
681     {
682     int len = strlen (name) + 1;
683    
684     res++;
685    
686     while (memofs + len > memlen)
687     {
688     memlen *= 2;
689     names = realloc (names, memlen);
690     if (!names)
691     break;
692     }
693    
694     memcpy (names + memofs, name, len);
695     memofs += len;
696     }
697     }
698    
699     errorno = errno;
700     closedir (dirp);
701    
702     if (errorno)
703     {
704     free (names);
705     errno = errorno;
706     res = -1;
707     }
708    
709     *namesp = (void *)names;
710     return res;
711 root 1.32 }
712    
713 root 1.22 /*****************************************************************************/
714    
715 root 1.45 static void *aio_proc (void *thr_arg)
716 root 1.1 {
717     aio_req req;
718 root 1.3 int type;
719 root 1.1
720 root 1.3 do
721 root 1.1 {
722 root 1.3 pthread_mutex_lock (&reqlock);
723    
724     for (;;)
725     {
726     req = reqs;
727    
728     if (reqs)
729     {
730     reqs = reqs->next;
731     if (!reqs) reqe = 0;
732     }
733    
734     if (req)
735     break;
736    
737     pthread_cond_wait (&reqwait, &reqlock);
738     }
739    
740     pthread_mutex_unlock (&reqlock);
741    
742 root 1.1 errno = 0; /* strictly unnecessary */
743    
744 root 1.43 if (!req->cancelled)
745     switch (req->type)
746     {
747     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
748     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
749 root 1.3
750 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
751     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
752 root 1.16
753 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
754     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
755     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
756    
757     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
758     case REQ_CLOSE: req->result = close (req->fd); break;
759     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
760     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
761     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
762     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
763     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
764    
765     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
766     case REQ_FSYNC: req->result = fsync (req->fd); break;
767     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
768 root 1.1
769 root 1.45 case REQ_SLEEP:
770     {
771     struct timeval tv;
772    
773     tv.tv_sec = req->fd;
774     tv.tv_usec = req->fd2;
775    
776     req->result = select (0, 0, 0, 0, &tv);
777     }
778    
779 root 1.43 case REQ_QUIT:
780     break;
781 root 1.1
782 root 1.43 default:
783     req->result = ENOSYS;
784     break;
785     }
786 root 1.1
787     req->errorno = errno;
788 root 1.3
789     pthread_mutex_lock (&reslock);
790    
791     req->next = 0;
792    
793     if (rese)
794     {
795     rese->next = req;
796     rese = req;
797     }
798     else
799     {
800     rese = ress = req;
801    
802     /* write a dummy byte to the pipe so fh becomes ready */
803     write (respipe [1], &respipe, 1);
804     }
805    
806     pthread_mutex_unlock (&reslock);
807 root 1.1 }
808 root 1.3 while (type != REQ_QUIT);
809 root 1.1
810     return 0;
811     }
812    
813 root 1.37 /*****************************************************************************/
814    
815     static void atfork_prepare (void)
816     {
817     pthread_mutex_lock (&reqlock);
818     pthread_mutex_lock (&reslock);
819     #if !HAVE_PREADWRITE
820     pthread_mutex_lock (&preadwritelock);
821     #endif
822     #if !HAVE_READDIR_R
823     pthread_mutex_lock (&readdirlock);
824     #endif
825     }
826    
827     static void atfork_parent (void)
828     {
829     #if !HAVE_READDIR_R
830     pthread_mutex_unlock (&readdirlock);
831     #endif
832     #if !HAVE_PREADWRITE
833     pthread_mutex_unlock (&preadwritelock);
834     #endif
835     pthread_mutex_unlock (&reslock);
836     pthread_mutex_unlock (&reqlock);
837     }
838    
839     static void atfork_child (void)
840     {
841     aio_req prv;
842    
843     started = 0;
844    
845     while (reqs)
846     {
847     prv = reqs;
848     reqs = prv->next;
849 root 1.43 req_free (prv);
850 root 1.37 }
851    
852     reqs = reqe = 0;
853    
854     while (ress)
855     {
856     prv = ress;
857     ress = prv->next;
858 root 1.43 req_free (prv);
859 root 1.37 }
860    
861     ress = rese = 0;
862    
863     close (respipe [0]);
864     close (respipe [1]);
865     create_pipe ();
866    
867     atfork_parent ();
868     }
869    
870 root 1.22 #define dREQ \
871     aio_req req; \
872     \
873     if (SvOK (callback) && !SvROK (callback)) \
874 root 1.43 croak ("callback must be undef or of reference type"); \
875 root 1.22 \
876     Newz (0, req, 1, aio_cb); \
877     if (!req) \
878     croak ("out of memory during aio_req allocation"); \
879     \
880 root 1.43 req->callback = newSVsv (callback)
881    
882     #define REQ_SEND \
883     req_send (req); \
884     \
885     if (GIMME_V != G_VOID) \
886 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
887 root 1.22
888 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
889    
890 root 1.8 PROTOTYPES: ENABLE
891    
892 root 1.1 BOOT:
893     {
894 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
895     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
896     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
897     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
898    
899 root 1.26 create_pipe ();
900 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
901 root 1.1 }
902    
903     void
904 root 1.40 min_parallel (nthreads)
905 root 1.1 int nthreads
906     PROTOTYPE: $
907    
908     void
909 root 1.40 max_parallel (nthreads)
910 root 1.1 int nthreads
911     PROTOTYPE: $
912    
913 root 1.4 int
914 root 1.40 max_outstanding (nreqs)
915 root 1.4 int nreqs
916     PROTOTYPE: $
917     CODE:
918     RETVAL = max_outstanding;
919     max_outstanding = nreqs;
920    
921 root 1.1 void
922 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
923 root 1.1 SV * pathname
924     int flags
925     int mode
926     SV * callback
927 root 1.8 PROTOTYPE: $$$;$
928 root 1.43 PPCODE:
929 root 1.1 {
930 root 1.22 dREQ;
931 root 1.1
932     req->type = REQ_OPEN;
933     req->data = newSVsv (pathname);
934 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
935 root 1.1 req->fd = flags;
936     req->mode = mode;
937    
938 root 1.43 REQ_SEND;
939 root 1.1 }
940    
941     void
942 root 1.40 aio_close (fh,callback=&PL_sv_undef)
943 root 1.13 SV * fh
944     SV * callback
945 root 1.8 PROTOTYPE: $;$
946 root 1.1 ALIAS:
947     aio_close = REQ_CLOSE
948     aio_fsync = REQ_FSYNC
949     aio_fdatasync = REQ_FDATASYNC
950 root 1.43 PPCODE:
951 root 1.1 {
952 root 1.22 dREQ;
953 root 1.1
954     req->type = ix;
955 root 1.13 req->fh = newSVsv (fh);
956     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
957 root 1.1
958 root 1.43 REQ_SEND (req);
959 root 1.1 }
960    
961     void
962 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
963 root 1.13 SV * fh
964     UV offset
965 root 1.32 UV length
966 root 1.13 SV * data
967 root 1.32 UV dataoffset
968 root 1.13 SV * callback
969     ALIAS:
970     aio_read = REQ_READ
971     aio_write = REQ_WRITE
972 root 1.8 PROTOTYPE: $$$$$;$
973 root 1.43 PPCODE:
974 root 1.13 {
975     aio_req req;
976     STRLEN svlen;
977 root 1.21 char *svptr = SvPVbyte (data, svlen);
978 root 1.13
979     SvUPGRADE (data, SVt_PV);
980     SvPOK_on (data);
981 root 1.1
982 root 1.13 if (dataoffset < 0)
983     dataoffset += svlen;
984    
985     if (dataoffset < 0 || dataoffset > svlen)
986     croak ("data offset outside of string");
987    
988     if (ix == REQ_WRITE)
989     {
990     /* write: check length and adjust. */
991     if (length < 0 || length + dataoffset > svlen)
992     length = svlen - dataoffset;
993     }
994     else
995     {
996     /* read: grow scalar as necessary */
997     svptr = SvGROW (data, length + dataoffset);
998     }
999    
1000     if (length < 0)
1001     croak ("length must not be negative");
1002    
1003 root 1.22 {
1004     dREQ;
1005 root 1.13
1006 root 1.22 req->type = ix;
1007     req->fh = newSVsv (fh);
1008     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
1009     : IoOFP (sv_2io (fh)));
1010     req->offset = offset;
1011     req->length = length;
1012     req->data = SvREFCNT_inc (data);
1013     req->dataptr = (char *)svptr + dataoffset;
1014 root 1.13
1015 root 1.28 if (!SvREADONLY (data))
1016     {
1017     SvREADONLY_on (data);
1018     req->data2ptr = (void *)data;
1019     }
1020    
1021 root 1.43 REQ_SEND;
1022 root 1.22 }
1023 root 1.13 }
1024 root 1.1
1025     void
1026 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
1027 root 1.32 SV * out_fh
1028     SV * in_fh
1029     UV in_offset
1030     UV length
1031     SV * callback
1032     PROTOTYPE: $$$$;$
1033 root 1.43 PPCODE:
1034 root 1.32 {
1035     dREQ;
1036    
1037     req->type = REQ_SENDFILE;
1038     req->fh = newSVsv (out_fh);
1039     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1040     req->fh2 = newSVsv (in_fh);
1041     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1042     req->offset = in_offset;
1043     req->length = length;
1044    
1045 root 1.43 REQ_SEND;
1046 root 1.32 }
1047    
1048     void
1049 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1050 root 1.13 SV * fh
1051     UV offset
1052     IV length
1053     SV * callback
1054 root 1.8 PROTOTYPE: $$$;$
1055 root 1.43 PPCODE:
1056 root 1.1 {
1057 root 1.22 dREQ;
1058 root 1.1
1059     req->type = REQ_READAHEAD;
1060 root 1.13 req->fh = newSVsv (fh);
1061     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1062 root 1.1 req->offset = offset;
1063     req->length = length;
1064    
1065 root 1.43 REQ_SEND;
1066 root 1.1 }
1067    
1068     void
1069 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1070 root 1.1 SV * fh_or_path
1071     SV * callback
1072     ALIAS:
1073 root 1.8 aio_stat = REQ_STAT
1074     aio_lstat = REQ_LSTAT
1075 root 1.43 PPCODE:
1076 root 1.1 {
1077 root 1.22 dREQ;
1078 root 1.1
1079     New (0, req->statdata, 1, Stat_t);
1080     if (!req->statdata)
1081 root 1.27 {
1082 root 1.43 req_free (req);
1083 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1084     }
1085 root 1.1
1086     if (SvPOK (fh_or_path))
1087     {
1088 root 1.8 req->type = ix;
1089 root 1.1 req->data = newSVsv (fh_or_path);
1090 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1091 root 1.1 }
1092     else
1093     {
1094     req->type = REQ_FSTAT;
1095 root 1.13 req->fh = newSVsv (fh_or_path);
1096 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1097     }
1098    
1099 root 1.43 REQ_SEND;
1100 root 1.1 }
1101    
1102     void
1103 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1104 root 1.1 SV * pathname
1105     SV * callback
1106 root 1.22 ALIAS:
1107 root 1.40 aio_unlink = REQ_UNLINK
1108     aio_rmdir = REQ_RMDIR
1109     aio_readdir = REQ_READDIR
1110 root 1.43 PPCODE:
1111 root 1.1 {
1112 root 1.22 dREQ;
1113 root 1.1
1114 root 1.22 req->type = ix;
1115     req->data = newSVsv (pathname);
1116     req->dataptr = SvPVbyte_nolen (req->data);
1117 root 1.1
1118 root 1.43 REQ_SEND;
1119 root 1.22 }
1120    
1121     void
1122 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1123 root 1.22 SV * oldpath
1124     SV * newpath
1125     SV * callback
1126 root 1.40 ALIAS:
1127     aio_link = REQ_LINK
1128     aio_symlink = REQ_SYMLINK
1129     aio_rename = REQ_RENAME
1130 root 1.43 PPCODE:
1131 root 1.22 {
1132     dREQ;
1133 root 1.1
1134 root 1.40 req->type = ix;
1135 root 1.22 req->fh = newSVsv (oldpath);
1136     req->data2ptr = SvPVbyte_nolen (req->fh);
1137     req->data = newSVsv (newpath);
1138     req->dataptr = SvPVbyte_nolen (req->data);
1139 root 1.1
1140 root 1.43 REQ_SEND;
1141 root 1.1 }
1142    
1143 root 1.42 void
1144 root 1.45 aio_sleep (delay,callback=&PL_sv_undef)
1145     double delay
1146     SV * callback
1147     PPCODE:
1148     {
1149     dREQ;
1150    
1151     req->type = REQ_SLEEP;
1152     req->fd = delay < 0. ? 0 : delay;
1153     req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd);
1154    
1155     REQ_SEND;
1156     }
1157    
1158     void
1159 root 1.44 aio_group (callback=&PL_sv_undef)
1160     SV * callback
1161 root 1.46 PROTOTYPE: ;$
1162 root 1.44 PPCODE:
1163 root 1.42 {
1164 root 1.44 dREQ;
1165     req->type = REQ_GROUP;
1166 root 1.45 req->grp_next = req;
1167     req->grp_prev = req;
1168    
1169     req_send (req);
1170 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1171 root 1.42 }
1172    
1173 root 1.6 void
1174 root 1.40 flush ()
1175 root 1.6 PROTOTYPE:
1176     CODE:
1177     while (nreqs)
1178     {
1179     poll_wait ();
1180     poll_cb ();
1181     }
1182    
1183 root 1.7 void
1184     poll()
1185     PROTOTYPE:
1186     CODE:
1187     if (nreqs)
1188     {
1189     poll_wait ();
1190     poll_cb ();
1191     }
1192    
1193 root 1.1 int
1194     poll_fileno()
1195     PROTOTYPE:
1196     CODE:
1197 root 1.3 RETVAL = respipe [0];
1198 root 1.1 OUTPUT:
1199     RETVAL
1200    
1201     int
1202     poll_cb(...)
1203     PROTOTYPE:
1204     CODE:
1205 root 1.5 RETVAL = poll_cb ();
1206 root 1.1 OUTPUT:
1207     RETVAL
1208    
1209     void
1210     poll_wait()
1211     PROTOTYPE:
1212     CODE:
1213 root 1.3 if (nreqs)
1214     poll_wait ();
1215 root 1.1
1216     int
1217     nreqs()
1218     PROTOTYPE:
1219     CODE:
1220     RETVAL = nreqs;
1221     OUTPUT:
1222     RETVAL
1223    
1224 root 1.48 PROTOTYPES: DISABLE
1225    
1226 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1227 root 1.43
1228     void
1229     cancel (aio_req_ornot req)
1230     PROTOTYPE:
1231     CODE:
1232 root 1.45 req_cancel (req);
1233    
1234     MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1235    
1236     void
1237     add (aio_req grp, ...)
1238     PPCODE:
1239     {
1240     int i;
1241    
1242     for (i = 1; i < items; ++i )
1243     {
1244 root 1.46 if (GIMME_V != G_VOID)
1245     XPUSHs (sv_2mortal (newSVsv (ST (i))));
1246    
1247 root 1.45 aio_req req = SvAIO_REQ (ST (i));
1248    
1249 root 1.46 if (req)
1250     {
1251     req->grp_prev = grp;
1252     req->grp_next = grp->grp_next;
1253     grp->grp_next->grp_prev = req;
1254     grp->grp_next = req;
1255 root 1.45
1256 root 1.46 req->grp = grp;
1257     }
1258 root 1.45 }
1259     }
1260 root 1.43
1261 root 1.48 void
1262     result (aio_req grp, ...)
1263     CODE:
1264     {
1265     int i;
1266     AV *av = newAV ();
1267    
1268     if (grp->data)
1269     SvREFCNT_dec (grp->data);
1270    
1271     for (i = 1; i < items; ++i )
1272     av_push (av, newSVsv (ST (i)));
1273    
1274     grp->data = (SV *)av;
1275     }
1276