ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.59
Committed: Mon Oct 23 18:56:27 2006 UTC (17 years, 6 months ago) by root
Branch: MAIN
Changes since 1.58: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.45 #include <sys/time.h>
15     #include <sys/select.h>
16 root 1.1 #include <sys/types.h>
17     #include <sys/stat.h>
18 root 1.37 #include <limits.h>
19 root 1.1 #include <unistd.h>
20     #include <fcntl.h>
21     #include <signal.h>
22     #include <sched.h>
23    
24 root 1.32 #if HAVE_SENDFILE
25     # if __linux
26     # include <sys/sendfile.h>
27     # elif __freebsd
28     # include <sys/socket.h>
29     # include <sys/uio.h>
30     # elif __hpux
31     # include <sys/socket.h>
32 root 1.35 # elif __solaris /* not yet */
33     # include <sys/sendfile.h>
34 root 1.34 # else
35     # error sendfile support requested but not available
36 root 1.32 # endif
37     #endif
38 root 1.1
39 root 1.39 /* used for struct dirent, AIX doesn't provide it */
40     #ifndef NAME_MAX
41     # define NAME_MAX 4096
42     #endif
43    
44 root 1.4 #if __ia64
45     # define STACKSIZE 65536
46 root 1.1 #else
47 root 1.37 # define STACKSIZE 8192
48 root 1.1 #endif
49    
50     enum {
51     REQ_QUIT,
52     REQ_OPEN, REQ_CLOSE,
53     REQ_READ, REQ_WRITE, REQ_READAHEAD,
54 root 1.32 REQ_SENDFILE,
55 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
57 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 root 1.37 REQ_READDIR,
59 root 1.40 REQ_LINK, REQ_SYMLINK,
60 root 1.54 REQ_GROUP, REQ_NOP,
61 root 1.45 REQ_SLEEP,
62 root 1.1 };
63    
64 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
65     #define AIO_GRP_KLASS "IO::AIO::GRP"
66 root 1.43
67     typedef struct aio_cb
68     {
69 root 1.49 struct aio_cb *volatile next;
70 root 1.43
71 root 1.49 struct aio_cb *grp, *grp_prev, *grp_next, *grp_first;
72 root 1.1
73 root 1.43 SV *self; /* the perl counterpart of this request, if any */
74 root 1.1
75 root 1.43 SV *data, *callback;
76     SV *fh, *fh2;
77     void *dataptr, *data2ptr;
78     Stat_t *statdata;
79 root 1.1 off_t offset;
80     size_t length;
81     ssize_t result;
82 root 1.43
83     int type;
84     int fd, fd2;
85 root 1.1 int errorno;
86     STRLEN dataoffset;
87 root 1.43 mode_t mode; /* open */
88 root 1.58 unsigned char pri;
89     unsigned char flags;
90 root 1.1 } aio_cb;
91    
92 root 1.58 enum {
93     FLAG_CANCELLED = 0x01,
94     };
95    
96 root 1.1 typedef aio_cb *aio_req;
97 root 1.43 typedef aio_cb *aio_req_ornot;
98 root 1.1
99 root 1.30 static int started, wanted;
100 root 1.11 static volatile int nreqs;
101 root 1.4 static int max_outstanding = 1<<30;
102 root 1.3 static int respipe [2];
103 root 1.1
104 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
105     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
106     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
107    
108     static volatile aio_req reqs, reqe; /* queue start, queue end */
109     static volatile aio_req ress, rese; /* queue start, queue end */
110 root 1.1
111 root 1.50 static void req_invoke (aio_req req);
112 root 1.45 static void req_free (aio_req req);
113    
114 root 1.43 /* must be called at most once */
115 root 1.44 static SV *req_sv (aio_req req, const char *klass)
116 root 1.43 {
117 root 1.49 if (!req->self)
118     {
119     req->self = (SV *)newHV ();
120     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
121     }
122 root 1.43
123 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
124 root 1.43 }
125    
126 root 1.45 static aio_req SvAIO_REQ (SV *sv)
127 root 1.26 {
128 root 1.53 MAGIC *mg;
129    
130 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
131     croak ("object of class " AIO_REQ_KLASS " expected");
132 root 1.43
133 root 1.53 mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
134 root 1.43
135     return mg ? (aio_req)mg->mg_ptr : 0;
136     }
137    
138 root 1.49 static void aio_grp_feed (aio_req grp)
139     {
140 root 1.58 while (grp->length < grp->fd2 && !(grp->flags & FLAG_CANCELLED))
141 root 1.49 {
142     int old_len = grp->length;
143    
144     if (grp->fh2 && SvOK (grp->fh2))
145     {
146     dSP;
147    
148     ENTER;
149     SAVETMPS;
150     PUSHMARK (SP);
151     XPUSHs (req_sv (grp, AIO_GRP_KLASS));
152     PUTBACK;
153     call_sv (grp->fh2, G_VOID | G_EVAL);
154     SPAGAIN;
155     FREETMPS;
156     LEAVE;
157     }
158    
159     /* stop if no progress has been made */
160     if (old_len == grp->length)
161     {
162     SvREFCNT_dec (grp->fh2);
163     grp->fh2 = 0;
164     break;
165     }
166     }
167     }
168    
169 root 1.50 static void aio_grp_dec (aio_req grp)
170     {
171     --grp->length;
172    
173     /* call feeder, if applicable */
174     aio_grp_feed (grp);
175    
176     /* finish, if done */
177     if (!grp->length && grp->fd)
178     {
179     req_invoke (grp);
180     req_free (grp);
181     }
182     }
183    
184 root 1.45 static void poll_wait ()
185     {
186     if (nreqs && !ress)
187     {
188     fd_set rfd;
189     FD_ZERO(&rfd);
190     FD_SET(respipe [0], &rfd);
191    
192     select (respipe [0] + 1, &rfd, 0, 0, 0);
193     }
194     }
195    
196     static void req_invoke (aio_req req)
197     {
198     dSP;
199     int errorno = errno;
200    
201 root 1.58 if (req->flags & FLAG_CANCELLED || !SvOK (req->callback))
202 root 1.45 return;
203    
204     errno = req->errorno;
205    
206     ENTER;
207 root 1.49 SAVETMPS;
208 root 1.45 PUSHMARK (SP);
209 root 1.48 EXTEND (SP, 1);
210 root 1.45
211     switch (req->type)
212     {
213     case REQ_READDIR:
214     {
215     SV *rv = &PL_sv_undef;
216    
217     if (req->result >= 0)
218     {
219     char *buf = req->data2ptr;
220     AV *av = newAV ();
221    
222     while (req->result)
223     {
224     SV *sv = newSVpv (buf, 0);
225    
226     av_push (av, sv);
227     buf += SvCUR (sv) + 1;
228     req->result--;
229     }
230    
231     rv = sv_2mortal (newRV_noinc ((SV *)av));
232     }
233    
234 root 1.48 PUSHs (rv);
235 root 1.45 }
236     break;
237    
238     case REQ_OPEN:
239     {
240     /* convert fd to fh */
241     SV *fh;
242    
243 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
244 root 1.45 PUTBACK;
245     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
246     SPAGAIN;
247    
248     fh = SvREFCNT_inc (POPs);
249    
250     PUSHMARK (SP);
251     XPUSHs (sv_2mortal (fh));
252     }
253     break;
254    
255 root 1.48 case REQ_GROUP:
256 root 1.49 req->fd = 2; /* mark group as finished */
257    
258 root 1.48 if (req->data)
259     {
260     int i;
261     AV *av = (AV *)req->data;
262    
263     EXTEND (SP, AvFILL (av) + 1);
264     for (i = 0; i <= AvFILL (av); ++i)
265     PUSHs (*av_fetch (av, i, 0));
266     }
267     break;
268    
269 root 1.54 case REQ_NOP:
270 root 1.45 case REQ_SLEEP:
271     break;
272    
273     default:
274 root 1.48 PUSHs (sv_2mortal (newSViv (req->result)));
275 root 1.45 break;
276     }
277    
278    
279     PUTBACK;
280     call_sv (req->callback, G_VOID | G_EVAL);
281     SPAGAIN;
282    
283 root 1.51 FREETMPS;
284     LEAVE;
285    
286     errno = errorno;
287    
288 root 1.45 if (SvTRUE (ERRSV))
289     {
290     req_free (req);
291     croak (0);
292     }
293     }
294    
295 root 1.43 static void req_free (aio_req req)
296     {
297 root 1.45 if (req->grp)
298     {
299     aio_req grp = req->grp;
300    
301     /* unlink request */
302 root 1.49 if (req->grp_next) req->grp_next->grp_prev = req->grp_prev;
303     if (req->grp_prev) req->grp_prev->grp_next = req->grp_next;
304    
305     if (grp->grp_first == req)
306     grp->grp_first = req->grp_next;
307    
308 root 1.50 aio_grp_dec (grp);
309 root 1.45 }
310    
311 root 1.43 if (req->self)
312     {
313     sv_unmagic (req->self, PERL_MAGIC_ext);
314     SvREFCNT_dec (req->self);
315     }
316    
317 root 1.49 SvREFCNT_dec (req->data);
318     SvREFCNT_dec (req->fh);
319     SvREFCNT_dec (req->fh2);
320     SvREFCNT_dec (req->callback);
321     Safefree (req->statdata);
322 root 1.26
323 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
324     free (req->data2ptr);
325    
326 root 1.26 Safefree (req);
327     }
328    
329 root 1.45 static void req_cancel (aio_req req)
330 root 1.1 {
331 root 1.58 req->flags |= FLAG_CANCELLED;
332 root 1.45
333     if (req->type == REQ_GROUP)
334 root 1.11 {
335 root 1.45 aio_req sub;
336 root 1.3
337 root 1.49 for (sub = req->grp_first; sub; sub = sub->grp_next)
338 root 1.45 req_cancel (sub);
339 root 1.11 }
340 root 1.1 }
341    
342 root 1.45 static int poll_cb ()
343 root 1.1 {
344     dSP;
345     int count = 0;
346 root 1.24 int do_croak = 0;
347 root 1.27 aio_req req;
348    
349     for (;;)
350     {
351     pthread_mutex_lock (&reslock);
352     req = ress;
353    
354     if (req)
355     {
356     ress = req->next;
357 root 1.11
358 root 1.27 if (!ress)
359     {
360     /* read any signals sent by the worker threads */
361     char buf [32];
362     while (read (respipe [0], buf, 32) == 32)
363     ;
364 root 1.30
365     rese = 0;
366 root 1.27 }
367     }
368 root 1.1
369 root 1.27 pthread_mutex_unlock (&reslock);
370 root 1.3
371 root 1.27 if (!req)
372     break;
373 root 1.3
374 root 1.50 --nreqs;
375 root 1.1
376     if (req->type == REQ_QUIT)
377     started--;
378 root 1.49 else if (req->type == REQ_GROUP && req->length)
379 root 1.46 {
380     req->fd = 1; /* mark request as delayed */
381     continue;
382     }
383 root 1.1 else
384     {
385     if (req->type == REQ_READ)
386 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
387 root 1.1
388 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
389     SvREADONLY_off (req->data);
390    
391 root 1.27 if (req->statdata)
392 root 1.1 {
393     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
394     PL_laststatval = req->result;
395     PL_statcache = *(req->statdata);
396     }
397    
398 root 1.45 req_invoke (req);
399 root 1.26
400 root 1.1 count++;
401     }
402    
403 root 1.43 req_free (req);
404 root 1.1 }
405    
406     return count;
407     }
408    
409 root 1.4 static void *aio_proc(void *arg);
410    
411 root 1.45 static void start_thread (void)
412 root 1.4 {
413     sigset_t fullsigset, oldsigset;
414     pthread_t tid;
415     pthread_attr_t attr;
416    
417     pthread_attr_init (&attr);
418     pthread_attr_setstacksize (&attr, STACKSIZE);
419 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
420 root 1.4
421     sigfillset (&fullsigset);
422     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
423    
424     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
425     started++;
426    
427     sigprocmask (SIG_SETMASK, &oldsigset, 0);
428     }
429    
430 root 1.45 static void req_send (aio_req req)
431 root 1.4 {
432 root 1.30 while (started < wanted && nreqs >= started)
433     start_thread ();
434    
435 root 1.50 ++nreqs;
436 root 1.4
437     pthread_mutex_lock (&reqlock);
438    
439     req->next = 0;
440    
441     if (reqe)
442     {
443     reqe->next = req;
444     reqe = req;
445     }
446     else
447     reqe = reqs = req;
448    
449     pthread_cond_signal (&reqwait);
450     pthread_mutex_unlock (&reqlock);
451    
452 root 1.27 if (nreqs > max_outstanding)
453     for (;;)
454     {
455     poll_cb ();
456    
457     if (nreqs <= max_outstanding)
458     break;
459    
460     poll_wait ();
461     }
462 root 1.4 }
463    
464 root 1.45 static void end_thread (void)
465 root 1.4 {
466     aio_req req;
467 root 1.26 Newz (0, req, 1, aio_cb);
468 root 1.4 req->type = REQ_QUIT;
469    
470 root 1.43 req_send (req);
471 root 1.4 }
472    
473 root 1.22 static void min_parallel (int nthreads)
474     {
475 root 1.30 if (wanted < nthreads)
476     wanted = nthreads;
477 root 1.22 }
478    
479     static void max_parallel (int nthreads)
480     {
481     int cur = started;
482 root 1.27
483 root 1.30 if (wanted > nthreads)
484     wanted = nthreads;
485    
486     while (cur > wanted)
487     {
488 root 1.22 end_thread ();
489     cur--;
490     }
491    
492 root 1.30 while (started > wanted)
493 root 1.22 {
494     poll_wait ();
495     poll_cb ();
496     }
497     }
498    
499 root 1.26 static void create_pipe ()
500 root 1.22 {
501 root 1.26 if (pipe (respipe))
502     croak ("unable to initialize result pipe");
503 root 1.22
504 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
505     croak ("cannot set result pipe to nonblocking mode");
506 root 1.22
507 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
508     croak ("cannot set result pipe to nonblocking mode");
509     }
510 root 1.22
511     /*****************************************************************************/
512 root 1.17 /* work around various missing functions */
513    
514     #if !HAVE_PREADWRITE
515     # define pread aio_pread
516     # define pwrite aio_pwrite
517    
518     /*
519     * make our pread/pwrite safe against themselves, but not against
520     * normal read/write by using a mutex. slows down execution a lot,
521     * but that's your problem, not mine.
522     */
523 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
524 root 1.17
525 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
526 root 1.17 {
527     ssize_t res;
528     off_t ooffset;
529    
530 root 1.37 pthread_mutex_lock (&preadwritelock);
531 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
532     lseek (fd, offset, SEEK_SET);
533     res = read (fd, buf, count);
534     lseek (fd, ooffset, SEEK_SET);
535 root 1.37 pthread_mutex_unlock (&preadwritelock);
536 root 1.17
537     return res;
538     }
539    
540 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
541 root 1.17 {
542     ssize_t res;
543     off_t ooffset;
544    
545 root 1.37 pthread_mutex_lock (&preadwritelock);
546 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
547     lseek (fd, offset, SEEK_SET);
548     res = write (fd, buf, count);
549     lseek (fd, offset, SEEK_SET);
550 root 1.37 pthread_mutex_unlock (&preadwritelock);
551 root 1.17
552     return res;
553     }
554     #endif
555    
556     #if !HAVE_FDATASYNC
557     # define fdatasync fsync
558     #endif
559    
560     #if !HAVE_READAHEAD
561     # define readahead aio_readahead
562    
563 root 1.45 static ssize_t readahead (int fd, off_t offset, size_t count)
564 root 1.17 {
565 root 1.37 char readahead_buf[4096];
566    
567 root 1.17 while (count > 0)
568     {
569     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
570    
571     pread (fd, readahead_buf, len, offset);
572     offset += len;
573     count -= len;
574     }
575    
576     errno = 0;
577     }
578     #endif
579    
580 root 1.37 #if !HAVE_READDIR_R
581     # define readdir_r aio_readdir_r
582    
583     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
584    
585 root 1.45 static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
586 root 1.37 {
587     struct dirent *e;
588     int errorno;
589    
590     pthread_mutex_lock (&readdirlock);
591    
592     e = readdir (dirp);
593     errorno = errno;
594    
595     if (e)
596     {
597     *res = ent;
598     strcpy (ent->d_name, e->d_name);
599     }
600     else
601     *res = 0;
602    
603     pthread_mutex_unlock (&readdirlock);
604    
605     errno = errorno;
606     return e ? 0 : -1;
607     }
608     #endif
609    
610 root 1.32 /* sendfile always needs emulation */
611 root 1.45 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count)
612 root 1.32 {
613 root 1.37 ssize_t res;
614 root 1.32
615 root 1.37 if (!count)
616     return 0;
617 root 1.32
618 root 1.35 #if HAVE_SENDFILE
619     # if __linux
620 root 1.37 res = sendfile (ofd, ifd, &offset, count);
621 root 1.32
622 root 1.35 # elif __freebsd
623 root 1.37 /*
624     * Of course, the freebsd sendfile is a dire hack with no thoughts
625     * wasted on making it similar to other I/O functions.
626     */
627     {
628     off_t sbytes;
629     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
630    
631     if (res < 0 && sbytes)
632     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
633     res = sbytes;
634     }
635 root 1.32
636 root 1.35 # elif __hpux
637 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
638 root 1.32
639 root 1.35 # elif __solaris
640 root 1.37 {
641     struct sendfilevec vec;
642     size_t sbytes;
643    
644     vec.sfv_fd = ifd;
645     vec.sfv_flag = 0;
646     vec.sfv_off = offset;
647     vec.sfv_len = count;
648    
649     res = sendfilev (ofd, &vec, 1, &sbytes);
650    
651     if (res < 0 && sbytes)
652     res = sbytes;
653     }
654 root 1.35
655 root 1.38 # endif
656     #else
657 root 1.37 res = -1;
658     errno = ENOSYS;
659 root 1.32 #endif
660    
661 root 1.37 if (res < 0
662     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
663 root 1.35 #if __solaris
664 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
665 root 1.35 #endif
666 root 1.37 )
667     )
668     {
669     /* emulate sendfile. this is a major pain in the ass */
670     char buf[4096];
671     res = 0;
672    
673 root 1.38 while (count)
674 root 1.37 {
675     ssize_t cnt;
676    
677 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
678 root 1.32
679 root 1.37 if (cnt <= 0)
680     {
681     if (cnt && !res) res = -1;
682     break;
683     }
684    
685     cnt = write (ofd, buf, cnt);
686    
687     if (cnt <= 0)
688     {
689     if (cnt && !res) res = -1;
690     break;
691     }
692    
693     offset += cnt;
694     res += cnt;
695 root 1.38 count -= cnt;
696 root 1.37 }
697     }
698    
699     return res;
700     }
701    
702     /* read a full directory */
703 root 1.45 static int scandir_ (const char *path, void **namesp)
704 root 1.37 {
705     DIR *dirp = opendir (path);
706     union
707     {
708     struct dirent d;
709     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
710     } u;
711     struct dirent *entp;
712     char *name, *names;
713     int memlen = 4096;
714     int memofs = 0;
715     int res = 0;
716     int errorno;
717    
718     if (!dirp)
719     return -1;
720    
721     names = malloc (memlen);
722    
723     for (;;)
724     {
725     errno = 0, readdir_r (dirp, &u.d, &entp);
726    
727     if (!entp)
728     break;
729    
730     name = entp->d_name;
731    
732     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
733     {
734     int len = strlen (name) + 1;
735    
736     res++;
737    
738     while (memofs + len > memlen)
739     {
740     memlen *= 2;
741     names = realloc (names, memlen);
742     if (!names)
743     break;
744     }
745    
746     memcpy (names + memofs, name, len);
747     memofs += len;
748     }
749     }
750    
751     errorno = errno;
752     closedir (dirp);
753    
754     if (errorno)
755     {
756     free (names);
757     errno = errorno;
758     res = -1;
759     }
760    
761     *namesp = (void *)names;
762     return res;
763 root 1.32 }
764    
765 root 1.22 /*****************************************************************************/
766    
767 root 1.45 static void *aio_proc (void *thr_arg)
768 root 1.1 {
769     aio_req req;
770 root 1.3 int type;
771 root 1.1
772 root 1.3 do
773 root 1.1 {
774 root 1.3 pthread_mutex_lock (&reqlock);
775    
776     for (;;)
777     {
778     req = reqs;
779    
780     if (reqs)
781     {
782     reqs = reqs->next;
783     if (!reqs) reqe = 0;
784     }
785    
786     if (req)
787     break;
788    
789     pthread_cond_wait (&reqwait, &reqlock);
790     }
791    
792     pthread_mutex_unlock (&reqlock);
793    
794 root 1.1 errno = 0; /* strictly unnecessary */
795    
796 root 1.58 if (!(req->flags & FLAG_CANCELLED))
797 root 1.50 switch (type = req->type) /* remember type for QUIT check */
798 root 1.43 {
799     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
800     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
801 root 1.3
802 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
803     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
804 root 1.16
805 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
806     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
807     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
808    
809     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
810     case REQ_CLOSE: req->result = close (req->fd); break;
811     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
812     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
813     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
814     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
815     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
816    
817     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
818     case REQ_FSYNC: req->result = fsync (req->fd); break;
819     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
820 root 1.1
821 root 1.45 case REQ_SLEEP:
822     {
823     struct timeval tv;
824    
825     tv.tv_sec = req->fd;
826     tv.tv_usec = req->fd2;
827    
828     req->result = select (0, 0, 0, 0, &tv);
829     }
830    
831 root 1.55 case REQ_GROUP:
832     case REQ_NOP:
833 root 1.43 case REQ_QUIT:
834     break;
835 root 1.1
836 root 1.43 default:
837     req->result = ENOSYS;
838     break;
839     }
840 root 1.1
841     req->errorno = errno;
842 root 1.3
843     pthread_mutex_lock (&reslock);
844    
845     req->next = 0;
846    
847     if (rese)
848     {
849     rese->next = req;
850     rese = req;
851     }
852     else
853     {
854     rese = ress = req;
855    
856     /* write a dummy byte to the pipe so fh becomes ready */
857     write (respipe [1], &respipe, 1);
858     }
859    
860     pthread_mutex_unlock (&reslock);
861 root 1.1 }
862 root 1.3 while (type != REQ_QUIT);
863 root 1.1
864     return 0;
865     }
866    
867 root 1.37 /*****************************************************************************/
868    
869     static void atfork_prepare (void)
870     {
871     pthread_mutex_lock (&reqlock);
872     pthread_mutex_lock (&reslock);
873     #if !HAVE_PREADWRITE
874     pthread_mutex_lock (&preadwritelock);
875     #endif
876     #if !HAVE_READDIR_R
877     pthread_mutex_lock (&readdirlock);
878     #endif
879     }
880    
881     static void atfork_parent (void)
882     {
883     #if !HAVE_READDIR_R
884     pthread_mutex_unlock (&readdirlock);
885     #endif
886     #if !HAVE_PREADWRITE
887     pthread_mutex_unlock (&preadwritelock);
888     #endif
889     pthread_mutex_unlock (&reslock);
890     pthread_mutex_unlock (&reqlock);
891     }
892    
893     static void atfork_child (void)
894     {
895     aio_req prv;
896    
897     started = 0;
898    
899     while (reqs)
900     {
901     prv = reqs;
902     reqs = prv->next;
903 root 1.43 req_free (prv);
904 root 1.37 }
905    
906     reqs = reqe = 0;
907    
908     while (ress)
909     {
910     prv = ress;
911     ress = prv->next;
912 root 1.43 req_free (prv);
913 root 1.37 }
914    
915     ress = rese = 0;
916    
917     close (respipe [0]);
918     close (respipe [1]);
919     create_pipe ();
920    
921     atfork_parent ();
922     }
923    
924 root 1.22 #define dREQ \
925     aio_req req; \
926     \
927     if (SvOK (callback) && !SvROK (callback)) \
928 root 1.43 croak ("callback must be undef or of reference type"); \
929 root 1.22 \
930     Newz (0, req, 1, aio_cb); \
931     if (!req) \
932     croak ("out of memory during aio_req allocation"); \
933     \
934 root 1.43 req->callback = newSVsv (callback)
935    
936     #define REQ_SEND \
937     req_send (req); \
938     \
939     if (GIMME_V != G_VOID) \
940 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
941 root 1.22
942 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
943    
944 root 1.8 PROTOTYPES: ENABLE
945    
946 root 1.1 BOOT:
947     {
948 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
949     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
950     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
951     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
952    
953 root 1.26 create_pipe ();
954 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
955 root 1.1 }
956    
957     void
958 root 1.40 min_parallel (nthreads)
959 root 1.1 int nthreads
960     PROTOTYPE: $
961    
962     void
963 root 1.40 max_parallel (nthreads)
964 root 1.1 int nthreads
965     PROTOTYPE: $
966    
967 root 1.4 int
968 root 1.40 max_outstanding (nreqs)
969 root 1.4 int nreqs
970     PROTOTYPE: $
971     CODE:
972     RETVAL = max_outstanding;
973     max_outstanding = nreqs;
974    
975 root 1.1 void
976 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
977 root 1.1 SV * pathname
978     int flags
979     int mode
980     SV * callback
981 root 1.8 PROTOTYPE: $$$;$
982 root 1.43 PPCODE:
983 root 1.1 {
984 root 1.22 dREQ;
985 root 1.1
986     req->type = REQ_OPEN;
987     req->data = newSVsv (pathname);
988 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
989 root 1.1 req->fd = flags;
990     req->mode = mode;
991    
992 root 1.43 REQ_SEND;
993 root 1.1 }
994    
995     void
996 root 1.40 aio_close (fh,callback=&PL_sv_undef)
997 root 1.13 SV * fh
998     SV * callback
999 root 1.8 PROTOTYPE: $;$
1000 root 1.1 ALIAS:
1001     aio_close = REQ_CLOSE
1002     aio_fsync = REQ_FSYNC
1003     aio_fdatasync = REQ_FDATASYNC
1004 root 1.43 PPCODE:
1005 root 1.1 {
1006 root 1.22 dREQ;
1007 root 1.1
1008     req->type = ix;
1009 root 1.13 req->fh = newSVsv (fh);
1010     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1011 root 1.1
1012 root 1.43 REQ_SEND (req);
1013 root 1.1 }
1014    
1015     void
1016 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
1017 root 1.13 SV * fh
1018     UV offset
1019 root 1.32 UV length
1020 root 1.13 SV * data
1021 root 1.32 UV dataoffset
1022 root 1.13 SV * callback
1023     ALIAS:
1024     aio_read = REQ_READ
1025     aio_write = REQ_WRITE
1026 root 1.8 PROTOTYPE: $$$$$;$
1027 root 1.43 PPCODE:
1028 root 1.13 {
1029     aio_req req;
1030     STRLEN svlen;
1031 root 1.21 char *svptr = SvPVbyte (data, svlen);
1032 root 1.13
1033     SvUPGRADE (data, SVt_PV);
1034     SvPOK_on (data);
1035 root 1.1
1036 root 1.13 if (dataoffset < 0)
1037     dataoffset += svlen;
1038    
1039     if (dataoffset < 0 || dataoffset > svlen)
1040     croak ("data offset outside of string");
1041    
1042     if (ix == REQ_WRITE)
1043     {
1044     /* write: check length and adjust. */
1045     if (length < 0 || length + dataoffset > svlen)
1046     length = svlen - dataoffset;
1047     }
1048     else
1049     {
1050     /* read: grow scalar as necessary */
1051     svptr = SvGROW (data, length + dataoffset);
1052     }
1053    
1054     if (length < 0)
1055     croak ("length must not be negative");
1056    
1057 root 1.22 {
1058     dREQ;
1059 root 1.13
1060 root 1.22 req->type = ix;
1061     req->fh = newSVsv (fh);
1062     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
1063     : IoOFP (sv_2io (fh)));
1064     req->offset = offset;
1065     req->length = length;
1066     req->data = SvREFCNT_inc (data);
1067     req->dataptr = (char *)svptr + dataoffset;
1068 root 1.13
1069 root 1.28 if (!SvREADONLY (data))
1070     {
1071     SvREADONLY_on (data);
1072     req->data2ptr = (void *)data;
1073     }
1074    
1075 root 1.43 REQ_SEND;
1076 root 1.22 }
1077 root 1.13 }
1078 root 1.1
1079     void
1080 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
1081 root 1.32 SV * out_fh
1082     SV * in_fh
1083     UV in_offset
1084     UV length
1085     SV * callback
1086     PROTOTYPE: $$$$;$
1087 root 1.43 PPCODE:
1088 root 1.32 {
1089     dREQ;
1090    
1091     req->type = REQ_SENDFILE;
1092     req->fh = newSVsv (out_fh);
1093     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1094     req->fh2 = newSVsv (in_fh);
1095     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1096     req->offset = in_offset;
1097     req->length = length;
1098    
1099 root 1.43 REQ_SEND;
1100 root 1.32 }
1101    
1102     void
1103 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1104 root 1.13 SV * fh
1105     UV offset
1106     IV length
1107     SV * callback
1108 root 1.8 PROTOTYPE: $$$;$
1109 root 1.43 PPCODE:
1110 root 1.1 {
1111 root 1.22 dREQ;
1112 root 1.1
1113     req->type = REQ_READAHEAD;
1114 root 1.13 req->fh = newSVsv (fh);
1115     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1116 root 1.1 req->offset = offset;
1117     req->length = length;
1118    
1119 root 1.43 REQ_SEND;
1120 root 1.1 }
1121    
1122     void
1123 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1124 root 1.1 SV * fh_or_path
1125     SV * callback
1126     ALIAS:
1127 root 1.8 aio_stat = REQ_STAT
1128     aio_lstat = REQ_LSTAT
1129 root 1.43 PPCODE:
1130 root 1.1 {
1131 root 1.22 dREQ;
1132 root 1.1
1133     New (0, req->statdata, 1, Stat_t);
1134     if (!req->statdata)
1135 root 1.27 {
1136 root 1.43 req_free (req);
1137 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1138     }
1139 root 1.1
1140     if (SvPOK (fh_or_path))
1141     {
1142 root 1.8 req->type = ix;
1143 root 1.1 req->data = newSVsv (fh_or_path);
1144 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1145 root 1.1 }
1146     else
1147     {
1148     req->type = REQ_FSTAT;
1149 root 1.13 req->fh = newSVsv (fh_or_path);
1150 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1151     }
1152    
1153 root 1.43 REQ_SEND;
1154 root 1.1 }
1155    
1156     void
1157 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1158 root 1.1 SV * pathname
1159     SV * callback
1160 root 1.22 ALIAS:
1161 root 1.40 aio_unlink = REQ_UNLINK
1162     aio_rmdir = REQ_RMDIR
1163     aio_readdir = REQ_READDIR
1164 root 1.43 PPCODE:
1165 root 1.1 {
1166 root 1.22 dREQ;
1167 root 1.1
1168 root 1.22 req->type = ix;
1169     req->data = newSVsv (pathname);
1170     req->dataptr = SvPVbyte_nolen (req->data);
1171 root 1.1
1172 root 1.43 REQ_SEND;
1173 root 1.22 }
1174    
1175     void
1176 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1177 root 1.22 SV * oldpath
1178     SV * newpath
1179     SV * callback
1180 root 1.40 ALIAS:
1181     aio_link = REQ_LINK
1182     aio_symlink = REQ_SYMLINK
1183     aio_rename = REQ_RENAME
1184 root 1.43 PPCODE:
1185 root 1.22 {
1186     dREQ;
1187 root 1.1
1188 root 1.40 req->type = ix;
1189 root 1.22 req->fh = newSVsv (oldpath);
1190     req->data2ptr = SvPVbyte_nolen (req->fh);
1191     req->data = newSVsv (newpath);
1192     req->dataptr = SvPVbyte_nolen (req->data);
1193 root 1.1
1194 root 1.43 REQ_SEND;
1195 root 1.1 }
1196    
1197 root 1.42 void
1198 root 1.45 aio_sleep (delay,callback=&PL_sv_undef)
1199     double delay
1200     SV * callback
1201     PPCODE:
1202     {
1203     dREQ;
1204    
1205     req->type = REQ_SLEEP;
1206     req->fd = delay < 0. ? 0 : delay;
1207     req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd);
1208    
1209     REQ_SEND;
1210     }
1211    
1212     void
1213 root 1.44 aio_group (callback=&PL_sv_undef)
1214     SV * callback
1215 root 1.46 PROTOTYPE: ;$
1216 root 1.44 PPCODE:
1217 root 1.42 {
1218 root 1.44 dREQ;
1219     req->type = REQ_GROUP;
1220 root 1.45 req_send (req);
1221 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1222 root 1.42 }
1223    
1224 root 1.6 void
1225 root 1.54 aio_nop (callback=&PL_sv_undef)
1226     SV * callback
1227     PPCODE:
1228     {
1229     dREQ;
1230    
1231     req->type = REQ_NOP;
1232    
1233     REQ_SEND;
1234     }
1235    
1236     void
1237 root 1.40 flush ()
1238 root 1.6 PROTOTYPE:
1239     CODE:
1240     while (nreqs)
1241     {
1242     poll_wait ();
1243     poll_cb ();
1244     }
1245    
1246 root 1.7 void
1247     poll()
1248     PROTOTYPE:
1249     CODE:
1250     if (nreqs)
1251     {
1252     poll_wait ();
1253     poll_cb ();
1254     }
1255    
1256 root 1.1 int
1257     poll_fileno()
1258     PROTOTYPE:
1259     CODE:
1260 root 1.3 RETVAL = respipe [0];
1261 root 1.1 OUTPUT:
1262     RETVAL
1263    
1264     int
1265     poll_cb(...)
1266     PROTOTYPE:
1267     CODE:
1268 root 1.5 RETVAL = poll_cb ();
1269 root 1.1 OUTPUT:
1270     RETVAL
1271    
1272     void
1273     poll_wait()
1274     PROTOTYPE:
1275     CODE:
1276 root 1.3 if (nreqs)
1277     poll_wait ();
1278 root 1.1
1279     int
1280     nreqs()
1281     PROTOTYPE:
1282     CODE:
1283     RETVAL = nreqs;
1284     OUTPUT:
1285     RETVAL
1286    
1287 root 1.48 PROTOTYPES: DISABLE
1288    
1289 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1290 root 1.43
1291     void
1292     cancel (aio_req_ornot req)
1293     PROTOTYPE:
1294     CODE:
1295 root 1.45 req_cancel (req);
1296    
1297 root 1.56 void
1298 root 1.59 cb (aio_req_ornot req, SV *callback=&PL_sv_undef)
1299 root 1.56 CODE:
1300     SvREFCNT_dec (req->callback);
1301     req->callback = newSVsv (callback);
1302    
1303 root 1.45 MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1304    
1305     void
1306     add (aio_req grp, ...)
1307     PPCODE:
1308     {
1309     int i;
1310 root 1.53 aio_req req;
1311 root 1.45
1312 root 1.49 if (grp->fd == 2)
1313     croak ("cannot add requests to IO::AIO::GRP after the group finished");
1314    
1315 root 1.45 for (i = 1; i < items; ++i )
1316     {
1317 root 1.46 if (GIMME_V != G_VOID)
1318     XPUSHs (sv_2mortal (newSVsv (ST (i))));
1319    
1320 root 1.53 req = SvAIO_REQ (ST (i));
1321 root 1.45
1322 root 1.46 if (req)
1323     {
1324 root 1.49 ++grp->length;
1325     req->grp = grp;
1326    
1327     req->grp_prev = 0;
1328     req->grp_next = grp->grp_first;
1329 root 1.45
1330 root 1.49 if (grp->grp_first)
1331     grp->grp_first->grp_prev = req;
1332    
1333     grp->grp_first = req;
1334 root 1.46 }
1335 root 1.45 }
1336     }
1337 root 1.43
1338 root 1.48 void
1339 root 1.51 result (aio_req grp, ...)
1340     CODE:
1341     {
1342     int i;
1343     AV *av = newAV ();
1344    
1345     for (i = 1; i < items; ++i )
1346     av_push (av, newSVsv (ST (i)));
1347    
1348     SvREFCNT_dec (grp->data);
1349     grp->data = (SV *)av;
1350     }
1351    
1352     void
1353 root 1.56 feed_limit (aio_req grp, int limit)
1354 root 1.49 CODE:
1355     grp->fd2 = limit;
1356     aio_grp_feed (grp);
1357    
1358     void
1359 root 1.56 feed (aio_req grp, SV *callback=&PL_sv_undef)
1360 root 1.49 CODE:
1361     {
1362     SvREFCNT_dec (grp->fh2);
1363     grp->fh2 = newSVsv (callback);
1364    
1365     if (grp->fd2 <= 0)
1366     grp->fd2 = 2;
1367    
1368     aio_grp_feed (grp);
1369     }
1370