ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.62
Committed: Mon Oct 23 22:45:18 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.61: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.45 #include <sys/time.h>
15     #include <sys/select.h>
16 root 1.1 #include <sys/types.h>
17     #include <sys/stat.h>
18 root 1.37 #include <limits.h>
19 root 1.1 #include <unistd.h>
20     #include <fcntl.h>
21     #include <signal.h>
22     #include <sched.h>
23    
24 root 1.32 #if HAVE_SENDFILE
25     # if __linux
26     # include <sys/sendfile.h>
27     # elif __freebsd
28     # include <sys/socket.h>
29     # include <sys/uio.h>
30     # elif __hpux
31     # include <sys/socket.h>
32 root 1.35 # elif __solaris /* not yet */
33     # include <sys/sendfile.h>
34 root 1.34 # else
35     # error sendfile support requested but not available
36 root 1.32 # endif
37     #endif
38 root 1.1
39 root 1.39 /* used for struct dirent, AIX doesn't provide it */
40     #ifndef NAME_MAX
41     # define NAME_MAX 4096
42     #endif
43    
44 root 1.4 #if __ia64
45     # define STACKSIZE 65536
46 root 1.1 #else
47 root 1.37 # define STACKSIZE 8192
48 root 1.1 #endif
49    
50     enum {
51     REQ_QUIT,
52     REQ_OPEN, REQ_CLOSE,
53     REQ_READ, REQ_WRITE, REQ_READAHEAD,
54 root 1.32 REQ_SENDFILE,
55 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
57 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 root 1.37 REQ_READDIR,
59 root 1.40 REQ_LINK, REQ_SYMLINK,
60 root 1.54 REQ_GROUP, REQ_NOP,
61 root 1.45 REQ_SLEEP,
62 root 1.1 };
63    
64 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
65     #define AIO_GRP_KLASS "IO::AIO::GRP"
66 root 1.43
67     typedef struct aio_cb
68     {
69 root 1.49 struct aio_cb *volatile next;
70 root 1.43
71     SV *data, *callback;
72     SV *fh, *fh2;
73     void *dataptr, *data2ptr;
74     Stat_t *statdata;
75 root 1.1 off_t offset;
76     size_t length;
77     ssize_t result;
78 root 1.43
79 root 1.60 STRLEN dataoffset;
80 root 1.43 int type;
81     int fd, fd2;
82 root 1.1 int errorno;
83 root 1.43 mode_t mode; /* open */
84 root 1.60
85     unsigned char flags;
86 root 1.58 unsigned char pri;
87 root 1.60
88     SV *self; /* the perl counterpart of this request, if any */
89     struct aio_cb *grp, *grp_prev, *grp_next, *grp_first;
90 root 1.1 } aio_cb;
91    
92 root 1.58 enum {
93     FLAG_CANCELLED = 0x01,
94     };
95    
96 root 1.1 typedef aio_cb *aio_req;
97 root 1.43 typedef aio_cb *aio_req_ornot;
98 root 1.1
99 root 1.60 enum {
100 root 1.61 PRI_MIN = -4,
101     PRI_MAX = 4,
102 root 1.60
103     DEFAULT_PRI = 0,
104 root 1.61 PRI_BIAS = -PRI_MIN,
105 root 1.60 };
106    
107     static int next_pri = DEFAULT_PRI + PRI_BIAS;
108    
109 root 1.30 static int started, wanted;
110 root 1.11 static volatile int nreqs;
111 root 1.4 static int max_outstanding = 1<<30;
112 root 1.3 static int respipe [2];
113 root 1.1
114 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
115     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
116     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
117    
118     static volatile aio_req reqs, reqe; /* queue start, queue end */
119     static volatile aio_req ress, rese; /* queue start, queue end */
120 root 1.1
121 root 1.50 static void req_invoke (aio_req req);
122 root 1.45 static void req_free (aio_req req);
123    
124 root 1.43 /* must be called at most once */
125 root 1.44 static SV *req_sv (aio_req req, const char *klass)
126 root 1.43 {
127 root 1.49 if (!req->self)
128     {
129     req->self = (SV *)newHV ();
130     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
131     }
132 root 1.43
133 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
134 root 1.43 }
135    
136 root 1.45 static aio_req SvAIO_REQ (SV *sv)
137 root 1.26 {
138 root 1.53 MAGIC *mg;
139    
140 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
141     croak ("object of class " AIO_REQ_KLASS " expected");
142 root 1.43
143 root 1.53 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
144 root 1.43
145     return mg ? (aio_req)mg->mg_ptr : 0;
146     }
147    
148 root 1.49 static void aio_grp_feed (aio_req grp)
149     {
150 root 1.58 while (grp->length < grp->fd2 && !(grp->flags & FLAG_CANCELLED))
151 root 1.49 {
152     int old_len = grp->length;
153    
154     if (grp->fh2 && SvOK (grp->fh2))
155     {
156     dSP;
157    
158     ENTER;
159     SAVETMPS;
160     PUSHMARK (SP);
161     XPUSHs (req_sv (grp, AIO_GRP_KLASS));
162     PUTBACK;
163     call_sv (grp->fh2, G_VOID | G_EVAL);
164     SPAGAIN;
165     FREETMPS;
166     LEAVE;
167     }
168    
169     /* stop if no progress has been made */
170     if (old_len == grp->length)
171     {
172     SvREFCNT_dec (grp->fh2);
173     grp->fh2 = 0;
174     break;
175     }
176     }
177     }
178    
179 root 1.50 static void aio_grp_dec (aio_req grp)
180     {
181     --grp->length;
182    
183     /* call feeder, if applicable */
184     aio_grp_feed (grp);
185    
186     /* finish, if done */
187     if (!grp->length && grp->fd)
188     {
189     req_invoke (grp);
190     req_free (grp);
191     }
192     }
193    
194 root 1.45 static void poll_wait ()
195     {
196 root 1.62 fd_set rfd;
197    
198 root 1.60 while (nreqs)
199 root 1.45 {
200 root 1.60 aio_req req;
201     pthread_mutex_lock (&reslock);
202     req = ress;
203     pthread_mutex_unlock (&reslock);
204    
205     if (req)
206     return;
207    
208 root 1.45 FD_ZERO(&rfd);
209     FD_SET(respipe [0], &rfd);
210    
211     select (respipe [0] + 1, &rfd, 0, 0, 0);
212     }
213     }
214    
215     static void req_invoke (aio_req req)
216     {
217     dSP;
218     int errorno = errno;
219    
220 root 1.58 if (req->flags & FLAG_CANCELLED || !SvOK (req->callback))
221 root 1.45 return;
222    
223     errno = req->errorno;
224    
225     ENTER;
226 root 1.49 SAVETMPS;
227 root 1.45 PUSHMARK (SP);
228 root 1.48 EXTEND (SP, 1);
229 root 1.45
230     switch (req->type)
231     {
232     case REQ_READDIR:
233     {
234     SV *rv = &PL_sv_undef;
235    
236     if (req->result >= 0)
237     {
238     char *buf = req->data2ptr;
239     AV *av = newAV ();
240    
241     while (req->result)
242     {
243     SV *sv = newSVpv (buf, 0);
244    
245     av_push (av, sv);
246     buf += SvCUR (sv) + 1;
247     req->result--;
248     }
249    
250     rv = sv_2mortal (newRV_noinc ((SV *)av));
251     }
252    
253 root 1.48 PUSHs (rv);
254 root 1.45 }
255     break;
256    
257     case REQ_OPEN:
258     {
259     /* convert fd to fh */
260     SV *fh;
261    
262 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
263 root 1.45 PUTBACK;
264     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
265     SPAGAIN;
266    
267     fh = SvREFCNT_inc (POPs);
268    
269     PUSHMARK (SP);
270     XPUSHs (sv_2mortal (fh));
271     }
272     break;
273    
274 root 1.48 case REQ_GROUP:
275 root 1.49 req->fd = 2; /* mark group as finished */
276    
277 root 1.48 if (req->data)
278     {
279     int i;
280     AV *av = (AV *)req->data;
281    
282     EXTEND (SP, AvFILL (av) + 1);
283     for (i = 0; i <= AvFILL (av); ++i)
284     PUSHs (*av_fetch (av, i, 0));
285     }
286     break;
287    
288 root 1.54 case REQ_NOP:
289 root 1.45 case REQ_SLEEP:
290     break;
291    
292     default:
293 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
294 root 1.45 break;
295     }
296    
297    
298     PUTBACK;
299     call_sv (req->callback, G_VOID | G_EVAL);
300     SPAGAIN;
301    
302 root 1.51 FREETMPS;
303     LEAVE;
304    
305     errno = errorno;
306    
307 root 1.45 if (SvTRUE (ERRSV))
308     {
309     req_free (req);
310     croak (0);
311     }
312     }
313    
314 root 1.43 static void req_free (aio_req req)
315     {
316 root 1.45 if (req->grp)
317     {
318     aio_req grp = req->grp;
319    
320     /* unlink request */
321 root 1.49 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
322     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
323    
324     if (grp->grp_first == req)
325     grp->grp_first = req->grp_next;
326    
327 root 1.50 aio_grp_dec (grp);
328 root 1.45 }
329    
330 root 1.43 if (req->self)
331     {
332     sv_unmagic (req->self, PERL_MAGIC_ext);
333     SvREFCNT_dec (req->self);
334     }
335    
336 root 1.49 SvREFCNT_dec (req->data);
337     SvREFCNT_dec (req->fh);
338     SvREFCNT_dec (req->fh2);
339     SvREFCNT_dec (req->callback);
340     Safefree (req->statdata);
341 root 1.26
342 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
343     free (req->data2ptr);
344    
345 root 1.26 Safefree (req);
346     }
347    
348 root 1.45 static void req_cancel (aio_req req)
349 root 1.1 {
350 root 1.58 req->flags |= FLAG_CANCELLED;
351 root 1.45
352     if (req->type == REQ_GROUP)
353 root 1.11 {
354 root 1.45 aio_req sub;
355 root 1.3
356 root 1.49 for (sub = req->grp_first; sub; sub = sub->grp_next)
357 root 1.45 req_cancel (sub);
358 root 1.11 }
359 root 1.1 }
360    
361 root 1.45 static int poll_cb ()
362 root 1.1 {
363     dSP;
364     int count = 0;
365 root 1.24 int do_croak = 0;
366 root 1.27 aio_req req;
367    
368     for (;;)
369     {
370     pthread_mutex_lock (&reslock);
371     req = ress;
372    
373     if (req)
374     {
375     ress = req->next;
376 root 1.11
377 root 1.27 if (!ress)
378     {
379     /* read any signals sent by the worker threads */
380     char buf [32];
381     while (read (respipe [0], buf, 32) == 32)
382     ;
383 root 1.30
384     rese = 0;
385 root 1.27 }
386     }
387 root 1.1
388 root 1.27 pthread_mutex_unlock (&reslock);
389 root 1.3
390 root 1.27 if (!req)
391     break;
392 root 1.3
393 root 1.50 --nreqs;
394 root 1.1
395     if (req->type == REQ_QUIT)
396     started--;
397 root 1.49 else if (req->type == REQ_GROUP && req->length)
398 root 1.46 {
399     req->fd = 1; /* mark request as delayed */
400     continue;
401     }
402 root 1.1 else
403     {
404     if (req->type == REQ_READ)
405 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
406 root 1.1
407 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
408     SvREADONLY_off (req->data);
409    
410 root 1.27 if (req->statdata)
411 root 1.1 {
412     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
413     PL_laststatval = req->result;
414     PL_statcache = *(req->statdata);
415     }
416    
417 root 1.45 req_invoke (req);
418 root 1.26
419 root 1.1 count++;
420     }
421    
422 root 1.43 req_free (req);
423 root 1.1 }
424    
425     return count;
426     }
427    
428 root 1.4 static void *aio_proc(void *arg);
429    
430 root 1.45 static void start_thread (void)
431 root 1.4 {
432     sigset_t fullsigset, oldsigset;
433     pthread_t tid;
434     pthread_attr_t attr;
435    
436     pthread_attr_init (&attr);
437     pthread_attr_setstacksize (&attr, STACKSIZE);
438 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
439 root 1.4
440     sigfillset (&fullsigset);
441     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
442    
443     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
444     started++;
445    
446     sigprocmask (SIG_SETMASK, &oldsigset, 0);
447     }
448    
449 root 1.45 static void req_send (aio_req req)
450 root 1.4 {
451 root 1.30 while (started < wanted && nreqs >= started)
452     start_thread ();
453    
454 root 1.50 ++nreqs;
455 root 1.4
456     pthread_mutex_lock (&reqlock);
457    
458     req->next = 0;
459    
460     if (reqe)
461     {
462     reqe->next = req;
463     reqe = req;
464     }
465     else
466     reqe = reqs = req;
467    
468     pthread_cond_signal (&reqwait);
469     pthread_mutex_unlock (&reqlock);
470    
471 root 1.27 if (nreqs > max_outstanding)
472     for (;;)
473     {
474     poll_cb ();
475    
476     if (nreqs <= max_outstanding)
477     break;
478    
479     poll_wait ();
480     }
481 root 1.4 }
482    
483 root 1.45 static void end_thread (void)
484 root 1.4 {
485     aio_req req;
486 root 1.26 Newz (0, req, 1, aio_cb);
487 root 1.4 req->type = REQ_QUIT;
488    
489 root 1.43 req_send (req);
490 root 1.4 }
491    
492 root 1.22 static void min_parallel (int nthreads)
493     {
494 root 1.30 if (wanted < nthreads)
495     wanted = nthreads;
496 root 1.22 }
497    
498     static void max_parallel (int nthreads)
499     {
500     int cur = started;
501 root 1.27
502 root 1.30 if (wanted > nthreads)
503     wanted = nthreads;
504    
505     while (cur > wanted)
506     {
507 root 1.22 end_thread ();
508     cur--;
509     }
510    
511 root 1.30 while (started > wanted)
512 root 1.22 {
513     poll_wait ();
514     poll_cb ();
515     }
516     }
517    
518 root 1.26 static void create_pipe ()
519 root 1.22 {
520 root 1.26 if (pipe (respipe))
521     croak ("unable to initialize result pipe");
522 root 1.22
523 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
524     croak ("cannot set result pipe to nonblocking mode");
525 root 1.22
526 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
527     croak ("cannot set result pipe to nonblocking mode");
528     }
529 root 1.22
530     /*****************************************************************************/
531 root 1.17 /* work around various missing functions */
532    
533     #if !HAVE_PREADWRITE
534     # define pread aio_pread
535     # define pwrite aio_pwrite
536    
537     /*
538     * make our pread/pwrite safe against themselves, but not against
539     * normal read/write by using a mutex. slows down execution a lot,
540     * but that's your problem, not mine.
541     */
542 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
543 root 1.17
544 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
545 root 1.17 {
546     ssize_t res;
547     off_t ooffset;
548    
549 root 1.37 pthread_mutex_lock (&preadwritelock);
550 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
551     lseek (fd, offset, SEEK_SET);
552     res = read (fd, buf, count);
553     lseek (fd, ooffset, SEEK_SET);
554 root 1.37 pthread_mutex_unlock (&preadwritelock);
555 root 1.17
556     return res;
557     }
558    
559 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
560 root 1.17 {
561     ssize_t res;
562     off_t ooffset;
563    
564 root 1.37 pthread_mutex_lock (&preadwritelock);
565 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
566     lseek (fd, offset, SEEK_SET);
567     res = write (fd, buf, count);
568     lseek (fd, offset, SEEK_SET);
569 root 1.37 pthread_mutex_unlock (&preadwritelock);
570 root 1.17
571     return res;
572     }
573     #endif
574    
575     #if !HAVE_FDATASYNC
576     # define fdatasync fsync
577     #endif
578    
579     #if !HAVE_READAHEAD
580     # define readahead aio_readahead
581    
582 root 1.45 static ssize_t readahead (int fd, off_t offset, size_t count)
583 root 1.17 {
584 root 1.37 char readahead_buf[4096];
585    
586 root 1.17 while (count > 0)
587     {
588     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
589    
590     pread (fd, readahead_buf, len, offset);
591     offset += len;
592     count -= len;
593     }
594    
595     errno = 0;
596     }
597     #endif
598    
599 root 1.37 #if !HAVE_READDIR_R
600     # define readdir_r aio_readdir_r
601    
602     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
603    
604 root 1.45 static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
605 root 1.37 {
606     struct dirent *e;
607     int errorno;
608    
609     pthread_mutex_lock (&readdirlock);
610    
611     e = readdir (dirp);
612     errorno = errno;
613    
614     if (e)
615     {
616     *res = ent;
617     strcpy (ent->d_name, e->d_name);
618     }
619     else
620     *res = 0;
621    
622     pthread_mutex_unlock (&readdirlock);
623    
624     errno = errorno;
625     return e ? 0 : -1;
626     }
627     #endif
628    
629 root 1.32 /* sendfile always needs emulation */
630 root 1.45 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count)
631 root 1.32 {
632 root 1.37 ssize_t res;
633 root 1.32
634 root 1.37 if (!count)
635     return 0;
636 root 1.32
637 root 1.35 #if HAVE_SENDFILE
638     # if __linux
639 root 1.37 res = sendfile (ofd, ifd, &offset, count);
640 root 1.32
641 root 1.35 # elif __freebsd
642 root 1.37 /*
643     * Of course, the freebsd sendfile is a dire hack with no thoughts
644     * wasted on making it similar to other I/O functions.
645     */
646     {
647     off_t sbytes;
648     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
649    
650     if (res < 0 && sbytes)
651     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
652     res = sbytes;
653     }
654 root 1.32
655 root 1.35 # elif __hpux
656 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
657 root 1.32
658 root 1.35 # elif __solaris
659 root 1.37 {
660     struct sendfilevec vec;
661     size_t sbytes;
662    
663     vec.sfv_fd = ifd;
664     vec.sfv_flag = 0;
665     vec.sfv_off = offset;
666     vec.sfv_len = count;
667    
668     res = sendfilev (ofd, &vec, 1, &sbytes);
669    
670     if (res < 0 && sbytes)
671     res = sbytes;
672     }
673 root 1.35
674 root 1.38 # endif
675     #else
676 root 1.37 res = -1;
677     errno = ENOSYS;
678 root 1.32 #endif
679    
680 root 1.37 if (res < 0
681     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
682 root 1.35 #if __solaris
683 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
684 root 1.35 #endif
685 root 1.37 )
686     )
687     {
688     /* emulate sendfile. this is a major pain in the ass */
689     char buf[4096];
690     res = 0;
691    
692 root 1.38 while (count)
693 root 1.37 {
694     ssize_t cnt;
695    
696 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
697 root 1.32
698 root 1.37 if (cnt <= 0)
699     {
700     if (cnt && !res) res = -1;
701     break;
702     }
703    
704     cnt = write (ofd, buf, cnt);
705    
706     if (cnt <= 0)
707     {
708     if (cnt && !res) res = -1;
709     break;
710     }
711    
712     offset += cnt;
713     res += cnt;
714 root 1.38 count -= cnt;
715 root 1.37 }
716     }
717    
718     return res;
719     }
720    
721     /* read a full directory */
722 root 1.45 static int scandir_ (const char *path, void **namesp)
723 root 1.37 {
724     DIR *dirp = opendir (path);
725     union
726     {
727     struct dirent d;
728     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
729     } u;
730     struct dirent *entp;
731     char *name, *names;
732     int memlen = 4096;
733     int memofs = 0;
734     int res = 0;
735     int errorno;
736    
737     if (!dirp)
738     return -1;
739    
740     names = malloc (memlen);
741    
742     for (;;)
743     {
744     errno = 0, readdir_r (dirp, &u.d, &entp);
745    
746     if (!entp)
747     break;
748    
749     name = entp->d_name;
750    
751     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
752     {
753     int len = strlen (name) + 1;
754    
755     res++;
756    
757     while (memofs + len > memlen)
758     {
759     memlen *= 2;
760     names = realloc (names, memlen);
761     if (!names)
762     break;
763     }
764    
765     memcpy (names + memofs, name, len);
766     memofs += len;
767     }
768     }
769    
770     errorno = errno;
771     closedir (dirp);
772    
773     if (errorno)
774     {
775     free (names);
776     errno = errorno;
777     res = -1;
778     }
779    
780     *namesp = (void *)names;
781     return res;
782 root 1.32 }
783    
784 root 1.22 /*****************************************************************************/
785    
786 root 1.45 static void *aio_proc (void *thr_arg)
787 root 1.1 {
788     aio_req req;
789 root 1.3 int type;
790 root 1.1
791 root 1.3 do
792 root 1.1 {
793 root 1.3 pthread_mutex_lock (&reqlock);
794    
795     for (;;)
796     {
797     req = reqs;
798    
799     if (reqs)
800     {
801     reqs = reqs->next;
802     if (!reqs) reqe = 0;
803     }
804    
805     if (req)
806     break;
807    
808     pthread_cond_wait (&reqwait, &reqlock);
809     }
810    
811     pthread_mutex_unlock (&reqlock);
812    
813 root 1.1 errno = 0; /* strictly unnecessary */
814 root 1.60 type = req->type; /* remember type for QUIT check */
815 root 1.1
816 root 1.58 if (!(req->flags & FLAG_CANCELLED))
817 root 1.60 switch (type)
818 root 1.43 {
819     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
820     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
821 root 1.3
822 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
823     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
824 root 1.16
825 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
826     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
827     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
828    
829     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
830     case REQ_CLOSE: req->result = close (req->fd); break;
831     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
832     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
833     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
834     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
835     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
836    
837     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
838     case REQ_FSYNC: req->result = fsync (req->fd); break;
839     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
840 root 1.1
841 root 1.45 case REQ_SLEEP:
842     {
843     struct timeval tv;
844    
845     tv.tv_sec = req->fd;
846     tv.tv_usec = req->fd2;
847    
848     req->result = select (0, 0, 0, 0, &tv);
849     }
850    
851 root 1.55 case REQ_GROUP:
852     case REQ_NOP:
853 root 1.43 case REQ_QUIT:
854     break;
855 root 1.1
856 root 1.43 default:
857     req->result = ENOSYS;
858     break;
859     }
860 root 1.1
861     req->errorno = errno;
862 root 1.3
863     pthread_mutex_lock (&reslock);
864    
865     req->next = 0;
866    
867     if (rese)
868     {
869     rese->next = req;
870     rese = req;
871     }
872     else
873     {
874     rese = ress = req;
875    
876     /* write a dummy byte to the pipe so fh becomes ready */
877     write (respipe [1], &respipe, 1);
878     }
879    
880     pthread_mutex_unlock (&reslock);
881 root 1.1 }
882 root 1.3 while (type != REQ_QUIT);
883 root 1.1
884     return 0;
885     }
886    
887 root 1.37 /*****************************************************************************/
888    
889     static void atfork_prepare (void)
890     {
891     pthread_mutex_lock (&reqlock);
892     pthread_mutex_lock (&reslock);
893     #if !HAVE_PREADWRITE
894     pthread_mutex_lock (&preadwritelock);
895     #endif
896     #if !HAVE_READDIR_R
897     pthread_mutex_lock (&readdirlock);
898     #endif
899     }
900    
901     static void atfork_parent (void)
902     {
903     #if !HAVE_READDIR_R
904     pthread_mutex_unlock (&readdirlock);
905     #endif
906     #if !HAVE_PREADWRITE
907     pthread_mutex_unlock (&preadwritelock);
908     #endif
909     pthread_mutex_unlock (&reslock);
910     pthread_mutex_unlock (&reqlock);
911     }
912    
913     static void atfork_child (void)
914     {
915     aio_req prv;
916    
917     started = 0;
918    
919     while (reqs)
920     {
921     prv = reqs;
922     reqs = prv->next;
923 root 1.43 req_free (prv);
924 root 1.37 }
925    
926     reqs = reqe = 0;
927    
928     while (ress)
929     {
930     prv = ress;
931     ress = prv->next;
932 root 1.43 req_free (prv);
933 root 1.37 }
934    
935     ress = rese = 0;
936    
937     close (respipe [0]);
938     close (respipe [1]);
939     create_pipe ();
940    
941     atfork_parent ();
942     }
943    
944 root 1.22 #define dREQ \
945     aio_req req; \
946 root 1.60 int req_pri = next_pri; \
947     next_pri = DEFAULT_PRI + PRI_BIAS; \
948 root 1.22 \
949     if (SvOK (callback) && !SvROK (callback)) \
950 root 1.43 croak ("callback must be undef or of reference type"); \
951 root 1.22 \
952     Newz (0, req, 1, aio_cb); \
953     if (!req) \
954     croak ("out of memory during aio_req allocation"); \
955     \
956 root 1.60 req->callback = newSVsv (callback); \
957     req->pri = req_pri
958 root 1.43
959     #define REQ_SEND \
960     req_send (req); \
961     \
962     if (GIMME_V != G_VOID) \
963 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
964 root 1.22
965 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
966    
967 root 1.8 PROTOTYPES: ENABLE
968    
969 root 1.1 BOOT:
970     {
971 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
972     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
973     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
974     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
975    
976 root 1.26 create_pipe ();
977 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
978 root 1.1 }
979    
980     void
981 root 1.40 min_parallel (nthreads)
982 root 1.1 int nthreads
983     PROTOTYPE: $
984    
985     void
986 root 1.40 max_parallel (nthreads)
987 root 1.1 int nthreads
988     PROTOTYPE: $
989    
990 root 1.4 int
991 root 1.40 max_outstanding (nreqs)
992 root 1.4 int nreqs
993     PROTOTYPE: $
994     CODE:
995     RETVAL = max_outstanding;
996     max_outstanding = nreqs;
997    
998 root 1.1 void
999 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
1000 root 1.1 SV * pathname
1001     int flags
1002     int mode
1003     SV * callback
1004 root 1.8 PROTOTYPE: $$$;$
1005 root 1.43 PPCODE:
1006 root 1.1 {
1007 root 1.22 dREQ;
1008 root 1.1
1009     req->type = REQ_OPEN;
1010     req->data = newSVsv (pathname);
1011 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1012 root 1.1 req->fd = flags;
1013     req->mode = mode;
1014    
1015 root 1.43 REQ_SEND;
1016 root 1.1 }
1017    
1018     void
1019 root 1.40 aio_close (fh,callback=&PL_sv_undef)
1020 root 1.13 SV * fh
1021     SV * callback
1022 root 1.8 PROTOTYPE: $;$
1023 root 1.1 ALIAS:
1024     aio_close = REQ_CLOSE
1025     aio_fsync = REQ_FSYNC
1026     aio_fdatasync = REQ_FDATASYNC
1027 root 1.43 PPCODE:
1028 root 1.1 {
1029 root 1.22 dREQ;
1030 root 1.1
1031     req->type = ix;
1032 root 1.13 req->fh = newSVsv (fh);
1033     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1034 root 1.1
1035 root 1.43 REQ_SEND (req);
1036 root 1.1 }
1037    
1038     void
1039 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
1040 root 1.13 SV * fh
1041     UV offset
1042 root 1.32 UV length
1043 root 1.13 SV * data
1044 root 1.32 UV dataoffset
1045 root 1.13 SV * callback
1046     ALIAS:
1047     aio_read = REQ_READ
1048     aio_write = REQ_WRITE
1049 root 1.8 PROTOTYPE: $$$$$;$
1050 root 1.43 PPCODE:
1051 root 1.13 {
1052     aio_req req;
1053     STRLEN svlen;
1054 root 1.21 char *svptr = SvPVbyte (data, svlen);
1055 root 1.13
1056     SvUPGRADE (data, SVt_PV);
1057     SvPOK_on (data);
1058 root 1.1
1059 root 1.13 if (dataoffset < 0)
1060     dataoffset += svlen;
1061    
1062     if (dataoffset < 0 || dataoffset > svlen)
1063     croak ("data offset outside of string");
1064    
1065     if (ix == REQ_WRITE)
1066     {
1067     /* write: check length and adjust. */
1068     if (length < 0 || length + dataoffset > svlen)
1069     length = svlen - dataoffset;
1070     }
1071     else
1072     {
1073     /* read: grow scalar as necessary */
1074     svptr = SvGROW (data, length + dataoffset);
1075     }
1076    
1077     if (length < 0)
1078     croak ("length must not be negative");
1079    
1080 root 1.22 {
1081     dREQ;
1082 root 1.13
1083 root 1.22 req->type = ix;
1084     req->fh = newSVsv (fh);
1085     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
1086     : IoOFP (sv_2io (fh)));
1087     req->offset = offset;
1088     req->length = length;
1089     req->data = SvREFCNT_inc (data);
1090     req->dataptr = (char *)svptr + dataoffset;
1091 root 1.13
1092 root 1.28 if (!SvREADONLY (data))
1093     {
1094     SvREADONLY_on (data);
1095     req->data2ptr = (void *)data;
1096     }
1097    
1098 root 1.43 REQ_SEND;
1099 root 1.22 }
1100 root 1.13 }
1101 root 1.1
1102     void
1103 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
1104 root 1.32 SV * out_fh
1105     SV * in_fh
1106     UV in_offset
1107     UV length
1108     SV * callback
1109     PROTOTYPE: $$$$;$
1110 root 1.43 PPCODE:
1111 root 1.32 {
1112     dREQ;
1113    
1114     req->type = REQ_SENDFILE;
1115     req->fh = newSVsv (out_fh);
1116     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1117     req->fh2 = newSVsv (in_fh);
1118     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1119     req->offset = in_offset;
1120     req->length = length;
1121    
1122 root 1.43 REQ_SEND;
1123 root 1.32 }
1124    
1125     void
1126 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1127 root 1.13 SV * fh
1128     UV offset
1129     IV length
1130     SV * callback
1131 root 1.8 PROTOTYPE: $$$;$
1132 root 1.43 PPCODE:
1133 root 1.1 {
1134 root 1.22 dREQ;
1135 root 1.1
1136     req->type = REQ_READAHEAD;
1137 root 1.13 req->fh = newSVsv (fh);
1138     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1139 root 1.1 req->offset = offset;
1140     req->length = length;
1141    
1142 root 1.43 REQ_SEND;
1143 root 1.1 }
1144    
1145     void
1146 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1147 root 1.1 SV * fh_or_path
1148     SV * callback
1149     ALIAS:
1150 root 1.8 aio_stat = REQ_STAT
1151     aio_lstat = REQ_LSTAT
1152 root 1.43 PPCODE:
1153 root 1.1 {
1154 root 1.22 dREQ;
1155 root 1.1
1156     New (0, req->statdata, 1, Stat_t);
1157     if (!req->statdata)
1158 root 1.27 {
1159 root 1.43 req_free (req);
1160 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1161     }
1162 root 1.1
1163     if (SvPOK (fh_or_path))
1164     {
1165 root 1.8 req->type = ix;
1166 root 1.1 req->data = newSVsv (fh_or_path);
1167 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1168 root 1.1 }
1169     else
1170     {
1171     req->type = REQ_FSTAT;
1172 root 1.13 req->fh = newSVsv (fh_or_path);
1173 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1174     }
1175    
1176 root 1.43 REQ_SEND;
1177 root 1.1 }
1178    
1179     void
1180 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1181 root 1.1 SV * pathname
1182     SV * callback
1183 root 1.22 ALIAS:
1184 root 1.40 aio_unlink = REQ_UNLINK
1185     aio_rmdir = REQ_RMDIR
1186     aio_readdir = REQ_READDIR
1187 root 1.43 PPCODE:
1188 root 1.1 {
1189 root 1.22 dREQ;
1190 root 1.1
1191 root 1.22 req->type = ix;
1192     req->data = newSVsv (pathname);
1193     req->dataptr = SvPVbyte_nolen (req->data);
1194 root 1.1
1195 root 1.43 REQ_SEND;
1196 root 1.22 }
1197    
1198     void
1199 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1200 root 1.22 SV * oldpath
1201     SV * newpath
1202     SV * callback
1203 root 1.40 ALIAS:
1204     aio_link = REQ_LINK
1205     aio_symlink = REQ_SYMLINK
1206     aio_rename = REQ_RENAME
1207 root 1.43 PPCODE:
1208 root 1.22 {
1209     dREQ;
1210 root 1.1
1211 root 1.40 req->type = ix;
1212 root 1.22 req->fh = newSVsv (oldpath);
1213     req->data2ptr = SvPVbyte_nolen (req->fh);
1214     req->data = newSVsv (newpath);
1215     req->dataptr = SvPVbyte_nolen (req->data);
1216 root 1.1
1217 root 1.43 REQ_SEND;
1218 root 1.1 }
1219    
1220 root 1.42 void
1221 root 1.45 aio_sleep (delay,callback=&PL_sv_undef)
1222     double delay
1223     SV * callback
1224     PPCODE:
1225     {
1226     dREQ;
1227    
1228     req->type = REQ_SLEEP;
1229     req->fd = delay < 0. ? 0 : delay;
1230     req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd);
1231    
1232     REQ_SEND;
1233     }
1234    
1235     void
1236 root 1.44 aio_group (callback=&PL_sv_undef)
1237     SV * callback
1238 root 1.46 PROTOTYPE: ;$
1239 root 1.44 PPCODE:
1240 root 1.42 {
1241 root 1.44 dREQ;
1242 root 1.60
1243 root 1.44 req->type = REQ_GROUP;
1244 root 1.45 req_send (req);
1245 root 1.60
1246 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1247 root 1.42 }
1248    
1249 root 1.6 void
1250 root 1.54 aio_nop (callback=&PL_sv_undef)
1251     SV * callback
1252     PPCODE:
1253     {
1254     dREQ;
1255    
1256     req->type = REQ_NOP;
1257    
1258     REQ_SEND;
1259     }
1260    
1261 root 1.60 #if 0
1262    
1263     void
1264     aio_pri (int pri = DEFAULT_PRI)
1265     CODE:
1266 root 1.61 if (pri < PRI_MIN) pri = PRI_MIN;
1267     if (pri > PRI_MAX) pri = PRI_MAX;
1268 root 1.60 next_pri = pri + PRI_BIAS;
1269    
1270     #endif
1271    
1272 root 1.54 void
1273 root 1.40 flush ()
1274 root 1.6 PROTOTYPE:
1275     CODE:
1276     while (nreqs)
1277     {
1278     poll_wait ();
1279     poll_cb ();
1280     }
1281    
1282 root 1.7 void
1283     poll()
1284     PROTOTYPE:
1285     CODE:
1286     if (nreqs)
1287     {
1288     poll_wait ();
1289     poll_cb ();
1290     }
1291    
1292 root 1.1 int
1293     poll_fileno()
1294     PROTOTYPE:
1295     CODE:
1296 root 1.3 RETVAL = respipe [0];
1297 root 1.1 OUTPUT:
1298     RETVAL
1299    
1300     int
1301     poll_cb(...)
1302     PROTOTYPE:
1303     CODE:
1304 root 1.5 RETVAL = poll_cb ();
1305 root 1.1 OUTPUT:
1306     RETVAL
1307    
1308     void
1309     poll_wait()
1310     PROTOTYPE:
1311     CODE:
1312 root 1.3 if (nreqs)
1313     poll_wait ();
1314 root 1.1
1315     int
1316     nreqs()
1317     PROTOTYPE:
1318     CODE:
1319     RETVAL = nreqs;
1320     OUTPUT:
1321     RETVAL
1322    
1323 root 1.48 PROTOTYPES: DISABLE
1324    
1325 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1326 root 1.43
1327     void
1328     cancel (aio_req_ornot req)
1329     PROTOTYPE:
1330     CODE:
1331 root 1.45 req_cancel (req);
1332    
1333 root 1.56 void
1334 root 1.59 cb (aio_req_ornot req, SV *callback=&PL_sv_undef)
1335 root 1.56 CODE:
1336     SvREFCNT_dec (req->callback);
1337     req->callback = newSVsv (callback);
1338    
1339 root 1.45 MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1340    
1341     void
1342     add (aio_req grp, ...)
1343     PPCODE:
1344     {
1345     int i;
1346 root 1.53 aio_req req;
1347 root 1.45
1348 root 1.49 if (grp->fd == 2)
1349     croak ("cannot add requests to IO::AIO::GRP after the group finished");
1350    
1351 root 1.45 for (i = 1; i < items; ++i )
1352     {
1353 root 1.46 if (GIMME_V != G_VOID)
1354     XPUSHs (sv_2mortal (newSVsv (ST (i))));
1355    
1356 root 1.53 req = SvAIO_REQ (ST (i));
1357 root 1.45
1358 root 1.46 if (req)
1359     {
1360 root 1.49 ++grp->length;
1361     req->grp = grp;
1362    
1363     req->grp_prev = 0;
1364     req->grp_next = grp->grp_first;
1365 root 1.45
1366 root 1.49 if (grp->grp_first)
1367     grp->grp_first->grp_prev = req;
1368    
1369     grp->grp_first = req;
1370 root 1.46 }
1371 root 1.45 }
1372     }
1373 root 1.43
1374 root 1.48 void
1375 root 1.51 result (aio_req grp, ...)
1376     CODE:
1377     {
1378     int i;
1379     AV *av = newAV ();
1380    
1381     for (i = 1; i < items; ++i )
1382     av_push (av, newSVsv (ST (i)));
1383    
1384     SvREFCNT_dec (grp->data);
1385     grp->data = (SV *)av;
1386     }
1387    
1388     void
1389 root 1.56 feed_limit (aio_req grp, int limit)
1390 root 1.49 CODE:
1391     grp->fd2 = limit;
1392     aio_grp_feed (grp);
1393    
1394     void
1395 root 1.56 feed (aio_req grp, SV *callback=&PL_sv_undef)
1396 root 1.49 CODE:
1397     {
1398     SvREFCNT_dec (grp->fh2);
1399     grp->fh2 = newSVsv (callback);
1400    
1401     if (grp->fd2 <= 0)
1402     grp->fd2 = 2;
1403    
1404     aio_grp_feed (grp);
1405     }
1406