ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.64
Committed: Mon Oct 23 23:54:41 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.63: +4 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.63 #if __linux
2     # define _GNU_SOURCE
3     #endif
4    
5 root 1.20 #define _REENTRANT 1
6 root 1.63
7 root 1.19 #include <errno.h>
8    
9 root 1.15 #include "EXTERN.h"
10 root 1.1 #include "perl.h"
11     #include "XSUB.h"
12    
13 root 1.16 #include "autoconf/config.h"
14    
15 root 1.32 #include <pthread.h>
16    
17 root 1.37 #include <stddef.h>
18 root 1.41 #include <errno.h>
19 root 1.45 #include <sys/time.h>
20     #include <sys/select.h>
21 root 1.1 #include <sys/types.h>
22     #include <sys/stat.h>
23 root 1.37 #include <limits.h>
24 root 1.1 #include <unistd.h>
25     #include <fcntl.h>
26     #include <signal.h>
27     #include <sched.h>
28    
29 root 1.32 #if HAVE_SENDFILE
30     # if __linux
31     # include <sys/sendfile.h>
32     # elif __freebsd
33     # include <sys/socket.h>
34     # include <sys/uio.h>
35     # elif __hpux
36     # include <sys/socket.h>
37 root 1.35 # elif __solaris /* not yet */
38     # include <sys/sendfile.h>
39 root 1.34 # else
40     # error sendfile support requested but not available
41 root 1.32 # endif
42     #endif
43 root 1.1
44 root 1.39 /* used for struct dirent, AIX doesn't provide it */
45     #ifndef NAME_MAX
46     # define NAME_MAX 4096
47     #endif
48    
49 root 1.4 #if __ia64
50     # define STACKSIZE 65536
51 root 1.1 #else
52 root 1.37 # define STACKSIZE 8192
53 root 1.1 #endif
54    
55     enum {
56     REQ_QUIT,
57     REQ_OPEN, REQ_CLOSE,
58     REQ_READ, REQ_WRITE, REQ_READAHEAD,
59 root 1.32 REQ_SENDFILE,
60 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
61 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
62 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
63 root 1.37 REQ_READDIR,
64 root 1.40 REQ_LINK, REQ_SYMLINK,
65 root 1.54 REQ_GROUP, REQ_NOP,
66 root 1.45 REQ_SLEEP,
67 root 1.1 };
68    
69 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
70     #define AIO_GRP_KLASS "IO::AIO::GRP"
71 root 1.43
72     typedef struct aio_cb
73     {
74 root 1.49 struct aio_cb *volatile next;
75 root 1.43
76     SV *data, *callback;
77     SV *fh, *fh2;
78     void *dataptr, *data2ptr;
79     Stat_t *statdata;
80 root 1.1 off_t offset;
81     size_t length;
82     ssize_t result;
83 root 1.43
84 root 1.60 STRLEN dataoffset;
85 root 1.43 int type;
86     int fd, fd2;
87 root 1.1 int errorno;
88 root 1.43 mode_t mode; /* open */
89 root 1.60
90     unsigned char flags;
91 root 1.58 unsigned char pri;
92 root 1.60
93     SV *self; /* the perl counterpart of this request, if any */
94     struct aio_cb *grp, *grp_prev, *grp_next, *grp_first;
95 root 1.1 } aio_cb;
96    
97 root 1.58 enum {
98     FLAG_CANCELLED = 0x01,
99     };
100    
101 root 1.1 typedef aio_cb *aio_req;
102 root 1.43 typedef aio_cb *aio_req_ornot;
103 root 1.1
104 root 1.60 enum {
105 root 1.61 PRI_MIN = -4,
106     PRI_MAX = 4,
107 root 1.60
108     DEFAULT_PRI = 0,
109 root 1.61 PRI_BIAS = -PRI_MIN,
110 root 1.60 };
111    
112     static int next_pri = DEFAULT_PRI + PRI_BIAS;
113    
114 root 1.30 static int started, wanted;
115 root 1.11 static volatile int nreqs;
116 root 1.4 static int max_outstanding = 1<<30;
117 root 1.3 static int respipe [2];
118 root 1.1
119 root 1.63 #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
120     # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
121     #else
122     # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
123     #endif
124    
125     static pthread_mutex_t reslock = AIO_MUTEX_INIT;
126     static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
127 root 1.3 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
128    
129     static volatile aio_req reqs, reqe; /* queue start, queue end */
130     static volatile aio_req ress, rese; /* queue start, queue end */
131 root 1.1
132 root 1.50 static void req_invoke (aio_req req);
133 root 1.45 static void req_free (aio_req req);
134    
135 root 1.43 /* must be called at most once */
136 root 1.44 static SV *req_sv (aio_req req, const char *klass)
137 root 1.43 {
138 root 1.49 if (!req->self)
139     {
140     req->self = (SV *)newHV ();
141     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
142     }
143 root 1.43
144 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
145 root 1.43 }
146    
147 root 1.45 static aio_req SvAIO_REQ (SV *sv)
148 root 1.26 {
149 root 1.53 MAGIC *mg;
150    
151 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
152     croak ("object of class " AIO_REQ_KLASS " expected");
153 root 1.43
154 root 1.53 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
155 root 1.43
156     return mg ? (aio_req)mg->mg_ptr : 0;
157     }
158    
159 root 1.49 static void aio_grp_feed (aio_req grp)
160     {
161 root 1.58 while (grp->length < grp->fd2 && !(grp->flags & FLAG_CANCELLED))
162 root 1.49 {
163     int old_len = grp->length;
164    
165     if (grp->fh2 && SvOK (grp->fh2))
166     {
167     dSP;
168    
169     ENTER;
170     SAVETMPS;
171     PUSHMARK (SP);
172     XPUSHs (req_sv (grp, AIO_GRP_KLASS));
173     PUTBACK;
174     call_sv (grp->fh2, G_VOID | G_EVAL);
175     SPAGAIN;
176     FREETMPS;
177     LEAVE;
178     }
179    
180     /* stop if no progress has been made */
181     if (old_len == grp->length)
182     {
183     SvREFCNT_dec (grp->fh2);
184     grp->fh2 = 0;
185     break;
186     }
187     }
188     }
189    
190 root 1.50 static void aio_grp_dec (aio_req grp)
191     {
192     --grp->length;
193    
194     /* call feeder, if applicable */
195     aio_grp_feed (grp);
196    
197     /* finish, if done */
198     if (!grp->length && grp->fd)
199     {
200     req_invoke (grp);
201     req_free (grp);
202     }
203     }
204    
205 root 1.45 static void poll_wait ()
206     {
207 root 1.62 fd_set rfd;
208    
209 root 1.60 while (nreqs)
210 root 1.45 {
211 root 1.60 aio_req req;
212 root 1.64 #if !(__x86 || __x86_64) /* safe without sempahore on this archs */
213 root 1.60 pthread_mutex_lock (&reslock);
214 root 1.64 #endif
215 root 1.60 req = ress;
216 root 1.64 #if !(__x86 || __x86_64) /* safe without sempahore on this archs */
217 root 1.60 pthread_mutex_unlock (&reslock);
218 root 1.64 #endif
219 root 1.60
220     if (req)
221     return;
222    
223 root 1.45 FD_ZERO(&rfd);
224     FD_SET(respipe [0], &rfd);
225    
226     select (respipe [0] + 1, &rfd, 0, 0, 0);
227     }
228     }
229    
230     static void req_invoke (aio_req req)
231     {
232     dSP;
233     int errorno = errno;
234    
235 root 1.58 if (req->flags & FLAG_CANCELLED || !SvOK (req->callback))
236 root 1.45 return;
237    
238     errno = req->errorno;
239    
240     ENTER;
241 root 1.49 SAVETMPS;
242 root 1.45 PUSHMARK (SP);
243 root 1.48 EXTEND (SP, 1);
244 root 1.45
245     switch (req->type)
246     {
247     case REQ_READDIR:
248     {
249     SV *rv = &PL_sv_undef;
250    
251     if (req->result >= 0)
252     {
253     char *buf = req->data2ptr;
254     AV *av = newAV ();
255    
256     while (req->result)
257     {
258     SV *sv = newSVpv (buf, 0);
259    
260     av_push (av, sv);
261     buf += SvCUR (sv) + 1;
262     req->result--;
263     }
264    
265     rv = sv_2mortal (newRV_noinc ((SV *)av));
266     }
267    
268 root 1.48 PUSHs (rv);
269 root 1.45 }
270     break;
271    
272     case REQ_OPEN:
273     {
274     /* convert fd to fh */
275     SV *fh;
276    
277 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
278 root 1.45 PUTBACK;
279     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
280     SPAGAIN;
281    
282     fh = SvREFCNT_inc (POPs);
283    
284     PUSHMARK (SP);
285     XPUSHs (sv_2mortal (fh));
286     }
287     break;
288    
289 root 1.48 case REQ_GROUP:
290 root 1.49 req->fd = 2; /* mark group as finished */
291    
292 root 1.48 if (req->data)
293     {
294     int i;
295     AV *av = (AV *)req->data;
296    
297     EXTEND (SP, AvFILL (av) + 1);
298     for (i = 0; i <= AvFILL (av); ++i)
299     PUSHs (*av_fetch (av, i, 0));
300     }
301     break;
302    
303 root 1.54 case REQ_NOP:
304 root 1.45 case REQ_SLEEP:
305     break;
306    
307     default:
308 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
309 root 1.45 break;
310     }
311    
312    
313     PUTBACK;
314     call_sv (req->callback, G_VOID | G_EVAL);
315     SPAGAIN;
316    
317 root 1.51 FREETMPS;
318     LEAVE;
319    
320     errno = errorno;
321    
322 root 1.45 if (SvTRUE (ERRSV))
323     {
324     req_free (req);
325     croak (0);
326     }
327     }
328    
329 root 1.43 static void req_free (aio_req req)
330     {
331 root 1.45 if (req->grp)
332     {
333     aio_req grp = req->grp;
334    
335     /* unlink request */
336 root 1.49 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
337     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
338    
339     if (grp->grp_first == req)
340     grp->grp_first = req->grp_next;
341    
342 root 1.50 aio_grp_dec (grp);
343 root 1.45 }
344    
345 root 1.43 if (req->self)
346     {
347     sv_unmagic (req->self, PERL_MAGIC_ext);
348     SvREFCNT_dec (req->self);
349     }
350    
351 root 1.49 SvREFCNT_dec (req->data);
352     SvREFCNT_dec (req->fh);
353     SvREFCNT_dec (req->fh2);
354     SvREFCNT_dec (req->callback);
355     Safefree (req->statdata);
356 root 1.26
357 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
358     free (req->data2ptr);
359    
360 root 1.26 Safefree (req);
361     }
362    
363 root 1.45 static void req_cancel (aio_req req)
364 root 1.1 {
365 root 1.58 req->flags |= FLAG_CANCELLED;
366 root 1.45
367     if (req->type == REQ_GROUP)
368 root 1.11 {
369 root 1.45 aio_req sub;
370 root 1.3
371 root 1.49 for (sub = req->grp_first; sub; sub = sub->grp_next)
372 root 1.45 req_cancel (sub);
373 root 1.11 }
374 root 1.1 }
375    
376 root 1.45 static int poll_cb ()
377 root 1.1 {
378     dSP;
379     int count = 0;
380 root 1.24 int do_croak = 0;
381 root 1.27 aio_req req;
382    
383     for (;;)
384     {
385     pthread_mutex_lock (&reslock);
386     req = ress;
387    
388     if (req)
389     {
390     ress = req->next;
391 root 1.11
392 root 1.27 if (!ress)
393     {
394     /* read any signals sent by the worker threads */
395     char buf [32];
396     while (read (respipe [0], buf, 32) == 32)
397     ;
398 root 1.30
399     rese = 0;
400 root 1.27 }
401     }
402 root 1.1
403 root 1.27 pthread_mutex_unlock (&reslock);
404 root 1.3
405 root 1.27 if (!req)
406     break;
407 root 1.3
408 root 1.50 --nreqs;
409 root 1.1
410     if (req->type == REQ_QUIT)
411     started--;
412 root 1.49 else if (req->type == REQ_GROUP && req->length)
413 root 1.46 {
414     req->fd = 1; /* mark request as delayed */
415     continue;
416     }
417 root 1.1 else
418     {
419     if (req->type == REQ_READ)
420 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
421 root 1.1
422 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
423     SvREADONLY_off (req->data);
424    
425 root 1.27 if (req->statdata)
426 root 1.1 {
427     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
428     PL_laststatval = req->result;
429     PL_statcache = *(req->statdata);
430     }
431    
432 root 1.45 req_invoke (req);
433 root 1.26
434 root 1.1 count++;
435     }
436    
437 root 1.43 req_free (req);
438 root 1.1 }
439    
440     return count;
441     }
442    
443 root 1.4 static void *aio_proc(void *arg);
444    
445 root 1.45 static void start_thread (void)
446 root 1.4 {
447     sigset_t fullsigset, oldsigset;
448     pthread_t tid;
449     pthread_attr_t attr;
450    
451     pthread_attr_init (&attr);
452     pthread_attr_setstacksize (&attr, STACKSIZE);
453 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
454 root 1.4
455     sigfillset (&fullsigset);
456     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
457    
458     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
459     started++;
460    
461     sigprocmask (SIG_SETMASK, &oldsigset, 0);
462     }
463    
464 root 1.45 static void req_send (aio_req req)
465 root 1.4 {
466 root 1.30 while (started < wanted && nreqs >= started)
467     start_thread ();
468    
469 root 1.50 ++nreqs;
470 root 1.4
471     pthread_mutex_lock (&reqlock);
472    
473     req->next = 0;
474    
475     if (reqe)
476     {
477     reqe->next = req;
478     reqe = req;
479     }
480     else
481     reqe = reqs = req;
482    
483     pthread_cond_signal (&reqwait);
484     pthread_mutex_unlock (&reqlock);
485    
486 root 1.27 if (nreqs > max_outstanding)
487     for (;;)
488     {
489     poll_cb ();
490    
491     if (nreqs <= max_outstanding)
492     break;
493    
494     poll_wait ();
495     }
496 root 1.4 }
497    
498 root 1.45 static void end_thread (void)
499 root 1.4 {
500     aio_req req;
501 root 1.26 Newz (0, req, 1, aio_cb);
502 root 1.4 req->type = REQ_QUIT;
503    
504 root 1.43 req_send (req);
505 root 1.4 }
506    
507 root 1.22 static void min_parallel (int nthreads)
508     {
509 root 1.30 if (wanted < nthreads)
510     wanted = nthreads;
511 root 1.22 }
512    
513     static void max_parallel (int nthreads)
514     {
515     int cur = started;
516 root 1.27
517 root 1.30 if (wanted > nthreads)
518     wanted = nthreads;
519    
520     while (cur > wanted)
521     {
522 root 1.22 end_thread ();
523     cur--;
524     }
525    
526 root 1.30 while (started > wanted)
527 root 1.22 {
528     poll_wait ();
529     poll_cb ();
530     }
531     }
532    
533 root 1.26 static void create_pipe ()
534 root 1.22 {
535 root 1.26 if (pipe (respipe))
536     croak ("unable to initialize result pipe");
537 root 1.22
538 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
539     croak ("cannot set result pipe to nonblocking mode");
540 root 1.22
541 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
542     croak ("cannot set result pipe to nonblocking mode");
543     }
544 root 1.22
545     /*****************************************************************************/
546 root 1.17 /* work around various missing functions */
547    
548     #if !HAVE_PREADWRITE
549     # define pread aio_pread
550     # define pwrite aio_pwrite
551    
552     /*
553     * make our pread/pwrite safe against themselves, but not against
554     * normal read/write by using a mutex. slows down execution a lot,
555     * but that's your problem, not mine.
556     */
557 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
558 root 1.17
559 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
560 root 1.17 {
561     ssize_t res;
562     off_t ooffset;
563    
564 root 1.37 pthread_mutex_lock (&preadwritelock);
565 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
566     lseek (fd, offset, SEEK_SET);
567     res = read (fd, buf, count);
568     lseek (fd, ooffset, SEEK_SET);
569 root 1.37 pthread_mutex_unlock (&preadwritelock);
570 root 1.17
571     return res;
572     }
573    
574 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
575 root 1.17 {
576     ssize_t res;
577     off_t ooffset;
578    
579 root 1.37 pthread_mutex_lock (&preadwritelock);
580 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
581     lseek (fd, offset, SEEK_SET);
582     res = write (fd, buf, count);
583     lseek (fd, offset, SEEK_SET);
584 root 1.37 pthread_mutex_unlock (&preadwritelock);
585 root 1.17
586     return res;
587     }
588     #endif
589    
590     #if !HAVE_FDATASYNC
591     # define fdatasync fsync
592     #endif
593    
594     #if !HAVE_READAHEAD
595     # define readahead aio_readahead
596    
597 root 1.45 static ssize_t readahead (int fd, off_t offset, size_t count)
598 root 1.17 {
599 root 1.37 char readahead_buf[4096];
600    
601 root 1.17 while (count > 0)
602     {
603     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
604    
605     pread (fd, readahead_buf, len, offset);
606     offset += len;
607     count -= len;
608     }
609    
610     errno = 0;
611     }
612     #endif
613    
614 root 1.37 #if !HAVE_READDIR_R
615     # define readdir_r aio_readdir_r
616    
617     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
618    
619 root 1.45 static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
620 root 1.37 {
621     struct dirent *e;
622     int errorno;
623    
624     pthread_mutex_lock (&readdirlock);
625    
626     e = readdir (dirp);
627     errorno = errno;
628    
629     if (e)
630     {
631     *res = ent;
632     strcpy (ent->d_name, e->d_name);
633     }
634     else
635     *res = 0;
636    
637     pthread_mutex_unlock (&readdirlock);
638    
639     errno = errorno;
640     return e ? 0 : -1;
641     }
642     #endif
643    
644 root 1.32 /* sendfile always needs emulation */
645 root 1.45 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count)
646 root 1.32 {
647 root 1.37 ssize_t res;
648 root 1.32
649 root 1.37 if (!count)
650     return 0;
651 root 1.32
652 root 1.35 #if HAVE_SENDFILE
653     # if __linux
654 root 1.37 res = sendfile (ofd, ifd, &offset, count);
655 root 1.32
656 root 1.35 # elif __freebsd
657 root 1.37 /*
658     * Of course, the freebsd sendfile is a dire hack with no thoughts
659     * wasted on making it similar to other I/O functions.
660     */
661     {
662     off_t sbytes;
663     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
664    
665     if (res < 0 && sbytes)
666     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
667     res = sbytes;
668     }
669 root 1.32
670 root 1.35 # elif __hpux
671 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
672 root 1.32
673 root 1.35 # elif __solaris
674 root 1.37 {
675     struct sendfilevec vec;
676     size_t sbytes;
677    
678     vec.sfv_fd = ifd;
679     vec.sfv_flag = 0;
680     vec.sfv_off = offset;
681     vec.sfv_len = count;
682    
683     res = sendfilev (ofd, &vec, 1, &sbytes);
684    
685     if (res < 0 && sbytes)
686     res = sbytes;
687     }
688 root 1.35
689 root 1.38 # endif
690     #else
691 root 1.37 res = -1;
692     errno = ENOSYS;
693 root 1.32 #endif
694    
695 root 1.37 if (res < 0
696     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
697 root 1.35 #if __solaris
698 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
699 root 1.35 #endif
700 root 1.37 )
701     )
702     {
703     /* emulate sendfile. this is a major pain in the ass */
704     char buf[4096];
705     res = 0;
706    
707 root 1.38 while (count)
708 root 1.37 {
709     ssize_t cnt;
710    
711 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
712 root 1.32
713 root 1.37 if (cnt <= 0)
714     {
715     if (cnt && !res) res = -1;
716     break;
717     }
718    
719     cnt = write (ofd, buf, cnt);
720    
721     if (cnt <= 0)
722     {
723     if (cnt && !res) res = -1;
724     break;
725     }
726    
727     offset += cnt;
728     res += cnt;
729 root 1.38 count -= cnt;
730 root 1.37 }
731     }
732    
733     return res;
734     }
735    
736     /* read a full directory */
737 root 1.45 static int scandir_ (const char *path, void **namesp)
738 root 1.37 {
739     DIR *dirp = opendir (path);
740     union
741     {
742     struct dirent d;
743     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
744     } u;
745     struct dirent *entp;
746     char *name, *names;
747     int memlen = 4096;
748     int memofs = 0;
749     int res = 0;
750     int errorno;
751    
752     if (!dirp)
753     return -1;
754    
755     names = malloc (memlen);
756    
757     for (;;)
758     {
759     errno = 0, readdir_r (dirp, &u.d, &entp);
760    
761     if (!entp)
762     break;
763    
764     name = entp->d_name;
765    
766     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
767     {
768     int len = strlen (name) + 1;
769    
770     res++;
771    
772     while (memofs + len > memlen)
773     {
774     memlen *= 2;
775     names = realloc (names, memlen);
776     if (!names)
777     break;
778     }
779    
780     memcpy (names + memofs, name, len);
781     memofs += len;
782     }
783     }
784    
785     errorno = errno;
786     closedir (dirp);
787    
788     if (errorno)
789     {
790     free (names);
791     errno = errorno;
792     res = -1;
793     }
794    
795     *namesp = (void *)names;
796     return res;
797 root 1.32 }
798    
799 root 1.22 /*****************************************************************************/
800    
801 root 1.45 static void *aio_proc (void *thr_arg)
802 root 1.1 {
803     aio_req req;
804 root 1.3 int type;
805 root 1.1
806 root 1.3 do
807 root 1.1 {
808 root 1.3 pthread_mutex_lock (&reqlock);
809    
810     for (;;)
811     {
812     req = reqs;
813    
814     if (reqs)
815     {
816     reqs = reqs->next;
817     if (!reqs) reqe = 0;
818     }
819    
820     if (req)
821     break;
822    
823     pthread_cond_wait (&reqwait, &reqlock);
824     }
825    
826     pthread_mutex_unlock (&reqlock);
827    
828 root 1.1 errno = 0; /* strictly unnecessary */
829 root 1.60 type = req->type; /* remember type for QUIT check */
830 root 1.1
831 root 1.58 if (!(req->flags & FLAG_CANCELLED))
832 root 1.60 switch (type)
833 root 1.43 {
834     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
835     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
836 root 1.3
837 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
838     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
839 root 1.16
840 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
841     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
842     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
843    
844     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
845     case REQ_CLOSE: req->result = close (req->fd); break;
846     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
847     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
848     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
849     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
850     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
851    
852     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
853     case REQ_FSYNC: req->result = fsync (req->fd); break;
854     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
855 root 1.1
856 root 1.45 case REQ_SLEEP:
857     {
858     struct timeval tv;
859    
860     tv.tv_sec = req->fd;
861     tv.tv_usec = req->fd2;
862    
863     req->result = select (0, 0, 0, 0, &tv);
864     }
865    
866 root 1.55 case REQ_GROUP:
867     case REQ_NOP:
868 root 1.43 case REQ_QUIT:
869     break;
870 root 1.1
871 root 1.43 default:
872     req->result = ENOSYS;
873     break;
874     }
875 root 1.1
876     req->errorno = errno;
877 root 1.3
878     pthread_mutex_lock (&reslock);
879    
880     req->next = 0;
881    
882     if (rese)
883     {
884     rese->next = req;
885     rese = req;
886     }
887     else
888     {
889     rese = ress = req;
890    
891     /* write a dummy byte to the pipe so fh becomes ready */
892     write (respipe [1], &respipe, 1);
893     }
894    
895     pthread_mutex_unlock (&reslock);
896 root 1.1 }
897 root 1.3 while (type != REQ_QUIT);
898 root 1.1
899     return 0;
900     }
901    
902 root 1.37 /*****************************************************************************/
903    
904     static void atfork_prepare (void)
905     {
906     pthread_mutex_lock (&reqlock);
907     pthread_mutex_lock (&reslock);
908     #if !HAVE_PREADWRITE
909     pthread_mutex_lock (&preadwritelock);
910     #endif
911     #if !HAVE_READDIR_R
912     pthread_mutex_lock (&readdirlock);
913     #endif
914     }
915    
916     static void atfork_parent (void)
917     {
918     #if !HAVE_READDIR_R
919     pthread_mutex_unlock (&readdirlock);
920     #endif
921     #if !HAVE_PREADWRITE
922     pthread_mutex_unlock (&preadwritelock);
923     #endif
924     pthread_mutex_unlock (&reslock);
925     pthread_mutex_unlock (&reqlock);
926     }
927    
928     static void atfork_child (void)
929     {
930     aio_req prv;
931    
932     started = 0;
933    
934     while (reqs)
935     {
936     prv = reqs;
937     reqs = prv->next;
938 root 1.43 req_free (prv);
939 root 1.37 }
940    
941     reqs = reqe = 0;
942    
943     while (ress)
944     {
945     prv = ress;
946     ress = prv->next;
947 root 1.43 req_free (prv);
948 root 1.37 }
949    
950     ress = rese = 0;
951    
952     close (respipe [0]);
953     close (respipe [1]);
954     create_pipe ();
955    
956     atfork_parent ();
957     }
958    
959 root 1.22 #define dREQ \
960     aio_req req; \
961 root 1.60 int req_pri = next_pri; \
962     next_pri = DEFAULT_PRI + PRI_BIAS; \
963 root 1.22 \
964     if (SvOK (callback) && !SvROK (callback)) \
965 root 1.43 croak ("callback must be undef or of reference type"); \
966 root 1.22 \
967     Newz (0, req, 1, aio_cb); \
968     if (!req) \
969     croak ("out of memory during aio_req allocation"); \
970     \
971 root 1.60 req->callback = newSVsv (callback); \
972     req->pri = req_pri
973 root 1.43
974     #define REQ_SEND \
975     req_send (req); \
976     \
977     if (GIMME_V != G_VOID) \
978 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
979 root 1.22
980 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
981    
982 root 1.8 PROTOTYPES: ENABLE
983    
984 root 1.1 BOOT:
985     {
986 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
987     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
988     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
989     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
990    
991 root 1.26 create_pipe ();
992 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
993 root 1.1 }
994    
995     void
996 root 1.40 min_parallel (nthreads)
997 root 1.1 int nthreads
998     PROTOTYPE: $
999    
1000     void
1001 root 1.40 max_parallel (nthreads)
1002 root 1.1 int nthreads
1003     PROTOTYPE: $
1004    
1005 root 1.4 int
1006 root 1.40 max_outstanding (nreqs)
1007 root 1.4 int nreqs
1008     PROTOTYPE: $
1009     CODE:
1010     RETVAL = max_outstanding;
1011     max_outstanding = nreqs;
1012    
1013 root 1.1 void
1014 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
1015 root 1.1 SV * pathname
1016     int flags
1017     int mode
1018     SV * callback
1019 root 1.8 PROTOTYPE: $$$;$
1020 root 1.43 PPCODE:
1021 root 1.1 {
1022 root 1.22 dREQ;
1023 root 1.1
1024     req->type = REQ_OPEN;
1025     req->data = newSVsv (pathname);
1026 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1027 root 1.1 req->fd = flags;
1028     req->mode = mode;
1029    
1030 root 1.43 REQ_SEND;
1031 root 1.1 }
1032    
1033     void
1034 root 1.40 aio_close (fh,callback=&PL_sv_undef)
1035 root 1.13 SV * fh
1036     SV * callback
1037 root 1.8 PROTOTYPE: $;$
1038 root 1.1 ALIAS:
1039     aio_close = REQ_CLOSE
1040     aio_fsync = REQ_FSYNC
1041     aio_fdatasync = REQ_FDATASYNC
1042 root 1.43 PPCODE:
1043 root 1.1 {
1044 root 1.22 dREQ;
1045 root 1.1
1046     req->type = ix;
1047 root 1.13 req->fh = newSVsv (fh);
1048     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1049 root 1.1
1050 root 1.43 REQ_SEND (req);
1051 root 1.1 }
1052    
1053     void
1054 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
1055 root 1.13 SV * fh
1056     UV offset
1057 root 1.32 UV length
1058 root 1.13 SV * data
1059 root 1.32 UV dataoffset
1060 root 1.13 SV * callback
1061     ALIAS:
1062     aio_read = REQ_READ
1063     aio_write = REQ_WRITE
1064 root 1.8 PROTOTYPE: $$$$$;$
1065 root 1.43 PPCODE:
1066 root 1.13 {
1067     aio_req req;
1068     STRLEN svlen;
1069 root 1.21 char *svptr = SvPVbyte (data, svlen);
1070 root 1.13
1071     SvUPGRADE (data, SVt_PV);
1072     SvPOK_on (data);
1073 root 1.1
1074 root 1.13 if (dataoffset < 0)
1075     dataoffset += svlen;
1076    
1077     if (dataoffset < 0 || dataoffset > svlen)
1078     croak ("data offset outside of string");
1079    
1080     if (ix == REQ_WRITE)
1081     {
1082     /* write: check length and adjust. */
1083     if (length < 0 || length + dataoffset > svlen)
1084     length = svlen - dataoffset;
1085     }
1086     else
1087     {
1088     /* read: grow scalar as necessary */
1089     svptr = SvGROW (data, length + dataoffset);
1090     }
1091    
1092     if (length < 0)
1093     croak ("length must not be negative");
1094    
1095 root 1.22 {
1096     dREQ;
1097 root 1.13
1098 root 1.22 req->type = ix;
1099     req->fh = newSVsv (fh);
1100     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
1101     : IoOFP (sv_2io (fh)));
1102     req->offset = offset;
1103     req->length = length;
1104     req->data = SvREFCNT_inc (data);
1105     req->dataptr = (char *)svptr + dataoffset;
1106 root 1.13
1107 root 1.28 if (!SvREADONLY (data))
1108     {
1109     SvREADONLY_on (data);
1110     req->data2ptr = (void *)data;
1111     }
1112    
1113 root 1.43 REQ_SEND;
1114 root 1.22 }
1115 root 1.13 }
1116 root 1.1
1117     void
1118 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
1119 root 1.32 SV * out_fh
1120     SV * in_fh
1121     UV in_offset
1122     UV length
1123     SV * callback
1124     PROTOTYPE: $$$$;$
1125 root 1.43 PPCODE:
1126 root 1.32 {
1127     dREQ;
1128    
1129     req->type = REQ_SENDFILE;
1130     req->fh = newSVsv (out_fh);
1131     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1132     req->fh2 = newSVsv (in_fh);
1133     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1134     req->offset = in_offset;
1135     req->length = length;
1136    
1137 root 1.43 REQ_SEND;
1138 root 1.32 }
1139    
1140     void
1141 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1142 root 1.13 SV * fh
1143     UV offset
1144     IV length
1145     SV * callback
1146 root 1.8 PROTOTYPE: $$$;$
1147 root 1.43 PPCODE:
1148 root 1.1 {
1149 root 1.22 dREQ;
1150 root 1.1
1151     req->type = REQ_READAHEAD;
1152 root 1.13 req->fh = newSVsv (fh);
1153     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1154 root 1.1 req->offset = offset;
1155     req->length = length;
1156    
1157 root 1.43 REQ_SEND;
1158 root 1.1 }
1159    
1160     void
1161 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1162 root 1.1 SV * fh_or_path
1163     SV * callback
1164     ALIAS:
1165 root 1.8 aio_stat = REQ_STAT
1166     aio_lstat = REQ_LSTAT
1167 root 1.43 PPCODE:
1168 root 1.1 {
1169 root 1.22 dREQ;
1170 root 1.1
1171     New (0, req->statdata, 1, Stat_t);
1172     if (!req->statdata)
1173 root 1.27 {
1174 root 1.43 req_free (req);
1175 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1176     }
1177 root 1.1
1178     if (SvPOK (fh_or_path))
1179     {
1180 root 1.8 req->type = ix;
1181 root 1.1 req->data = newSVsv (fh_or_path);
1182 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1183 root 1.1 }
1184     else
1185     {
1186     req->type = REQ_FSTAT;
1187 root 1.13 req->fh = newSVsv (fh_or_path);
1188 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1189     }
1190    
1191 root 1.43 REQ_SEND;
1192 root 1.1 }
1193    
1194     void
1195 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1196 root 1.1 SV * pathname
1197     SV * callback
1198 root 1.22 ALIAS:
1199 root 1.40 aio_unlink = REQ_UNLINK
1200     aio_rmdir = REQ_RMDIR
1201     aio_readdir = REQ_READDIR
1202 root 1.43 PPCODE:
1203 root 1.1 {
1204 root 1.22 dREQ;
1205 root 1.1
1206 root 1.22 req->type = ix;
1207     req->data = newSVsv (pathname);
1208     req->dataptr = SvPVbyte_nolen (req->data);
1209 root 1.1
1210 root 1.43 REQ_SEND;
1211 root 1.22 }
1212    
1213     void
1214 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1215 root 1.22 SV * oldpath
1216     SV * newpath
1217     SV * callback
1218 root 1.40 ALIAS:
1219     aio_link = REQ_LINK
1220     aio_symlink = REQ_SYMLINK
1221     aio_rename = REQ_RENAME
1222 root 1.43 PPCODE:
1223 root 1.22 {
1224     dREQ;
1225 root 1.1
1226 root 1.40 req->type = ix;
1227 root 1.22 req->fh = newSVsv (oldpath);
1228     req->data2ptr = SvPVbyte_nolen (req->fh);
1229     req->data = newSVsv (newpath);
1230     req->dataptr = SvPVbyte_nolen (req->data);
1231 root 1.1
1232 root 1.43 REQ_SEND;
1233 root 1.1 }
1234    
1235 root 1.42 void
1236 root 1.45 aio_sleep (delay,callback=&PL_sv_undef)
1237     double delay
1238     SV * callback
1239     PPCODE:
1240     {
1241     dREQ;
1242    
1243     req->type = REQ_SLEEP;
1244     req->fd = delay < 0. ? 0 : delay;
1245     req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd);
1246    
1247     REQ_SEND;
1248     }
1249    
1250     void
1251 root 1.44 aio_group (callback=&PL_sv_undef)
1252     SV * callback
1253 root 1.46 PROTOTYPE: ;$
1254 root 1.44 PPCODE:
1255 root 1.42 {
1256 root 1.44 dREQ;
1257 root 1.60
1258 root 1.44 req->type = REQ_GROUP;
1259 root 1.45 req_send (req);
1260 root 1.60
1261 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1262 root 1.42 }
1263    
1264 root 1.6 void
1265 root 1.54 aio_nop (callback=&PL_sv_undef)
1266     SV * callback
1267     PPCODE:
1268     {
1269     dREQ;
1270    
1271     req->type = REQ_NOP;
1272    
1273     REQ_SEND;
1274     }
1275    
1276 root 1.60 #if 0
1277    
1278     void
1279     aio_pri (int pri = DEFAULT_PRI)
1280     CODE:
1281 root 1.61 if (pri < PRI_MIN) pri = PRI_MIN;
1282     if (pri > PRI_MAX) pri = PRI_MAX;
1283 root 1.60 next_pri = pri + PRI_BIAS;
1284    
1285     #endif
1286    
1287 root 1.54 void
1288 root 1.40 flush ()
1289 root 1.6 PROTOTYPE:
1290     CODE:
1291     while (nreqs)
1292     {
1293     poll_wait ();
1294     poll_cb ();
1295     }
1296    
1297 root 1.7 void
1298     poll()
1299     PROTOTYPE:
1300     CODE:
1301     if (nreqs)
1302     {
1303     poll_wait ();
1304     poll_cb ();
1305     }
1306    
1307 root 1.1 int
1308     poll_fileno()
1309     PROTOTYPE:
1310     CODE:
1311 root 1.3 RETVAL = respipe [0];
1312 root 1.1 OUTPUT:
1313     RETVAL
1314    
1315     int
1316     poll_cb(...)
1317     PROTOTYPE:
1318     CODE:
1319 root 1.5 RETVAL = poll_cb ();
1320 root 1.1 OUTPUT:
1321     RETVAL
1322    
1323     void
1324     poll_wait()
1325     PROTOTYPE:
1326     CODE:
1327 root 1.3 if (nreqs)
1328     poll_wait ();
1329 root 1.1
1330     int
1331     nreqs()
1332     PROTOTYPE:
1333     CODE:
1334     RETVAL = nreqs;
1335     OUTPUT:
1336     RETVAL
1337    
1338 root 1.48 PROTOTYPES: DISABLE
1339    
1340 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1341 root 1.43
1342     void
1343     cancel (aio_req_ornot req)
1344     PROTOTYPE:
1345     CODE:
1346 root 1.45 req_cancel (req);
1347    
1348 root 1.56 void
1349 root 1.59 cb (aio_req_ornot req, SV *callback=&PL_sv_undef)
1350 root 1.56 CODE:
1351     SvREFCNT_dec (req->callback);
1352     req->callback = newSVsv (callback);
1353    
1354 root 1.45 MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1355    
1356     void
1357     add (aio_req grp, ...)
1358     PPCODE:
1359     {
1360     int i;
1361 root 1.53 aio_req req;
1362 root 1.45
1363 root 1.49 if (grp->fd == 2)
1364     croak ("cannot add requests to IO::AIO::GRP after the group finished");
1365    
1366 root 1.45 for (i = 1; i < items; ++i )
1367     {
1368 root 1.46 if (GIMME_V != G_VOID)
1369     XPUSHs (sv_2mortal (newSVsv (ST (i))));
1370    
1371 root 1.53 req = SvAIO_REQ (ST (i));
1372 root 1.45
1373 root 1.46 if (req)
1374     {
1375 root 1.49 ++grp->length;
1376     req->grp = grp;
1377    
1378     req->grp_prev = 0;
1379     req->grp_next = grp->grp_first;
1380 root 1.45
1381 root 1.49 if (grp->grp_first)
1382     grp->grp_first->grp_prev = req;
1383    
1384     grp->grp_first = req;
1385 root 1.46 }
1386 root 1.45 }
1387     }
1388 root 1.43
1389 root 1.48 void
1390 root 1.51 result (aio_req grp, ...)
1391     CODE:
1392     {
1393     int i;
1394     AV *av = newAV ();
1395    
1396     for (i = 1; i < items; ++i )
1397     av_push (av, newSVsv (ST (i)));
1398    
1399     SvREFCNT_dec (grp->data);
1400     grp->data = (SV *)av;
1401     }
1402    
1403     void
1404 root 1.56 feed_limit (aio_req grp, int limit)
1405 root 1.49 CODE:
1406     grp->fd2 = limit;
1407     aio_grp_feed (grp);
1408    
1409     void
1410 root 1.56 feed (aio_req grp, SV *callback=&PL_sv_undef)
1411 root 1.49 CODE:
1412     {
1413     SvREFCNT_dec (grp->fh2);
1414     grp->fh2 = newSVsv (callback);
1415    
1416     if (grp->fd2 <= 0)
1417     grp->fd2 = 2;
1418    
1419     aio_grp_feed (grp);
1420     }
1421