ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.45
Committed: Sun Oct 22 00:19:05 2006 UTC (17 years, 7 months ago) by root
Branch: MAIN
Changes since 1.44: +194 -98 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.32 #include <pthread.h>
11    
12 root 1.37 #include <stddef.h>
13 root 1.41 #include <errno.h>
14 root 1.45 #include <sys/time.h>
15     #include <sys/select.h>
16 root 1.1 #include <sys/types.h>
17     #include <sys/stat.h>
18 root 1.37 #include <limits.h>
19 root 1.1 #include <unistd.h>
20     #include <fcntl.h>
21     #include <signal.h>
22     #include <sched.h>
23    
24 root 1.32 #if HAVE_SENDFILE
25     # if __linux
26     # include <sys/sendfile.h>
27     # elif __freebsd
28     # include <sys/socket.h>
29     # include <sys/uio.h>
30     # elif __hpux
31     # include <sys/socket.h>
32 root 1.35 # elif __solaris /* not yet */
33     # include <sys/sendfile.h>
34 root 1.34 # else
35     # error sendfile support requested but not available
36 root 1.32 # endif
37     #endif
38 root 1.1
39 root 1.39 /* used for struct dirent, AIX doesn't provide it */
40     #ifndef NAME_MAX
41     # define NAME_MAX 4096
42     #endif
43    
44 root 1.4 #if __ia64
45     # define STACKSIZE 65536
46 root 1.1 #else
47 root 1.37 # define STACKSIZE 8192
48 root 1.1 #endif
49    
50     enum {
51     REQ_QUIT,
52     REQ_OPEN, REQ_CLOSE,
53     REQ_READ, REQ_WRITE, REQ_READAHEAD,
54 root 1.32 REQ_SENDFILE,
55 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
56 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
57 root 1.40 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
58 root 1.37 REQ_READDIR,
59 root 1.40 REQ_LINK, REQ_SYMLINK,
60 root 1.45 REQ_SLEEP,
61 root 1.44 REQ_GROUP,
62 root 1.1 };
63    
64 root 1.44 #define AIO_REQ_KLASS "IO::AIO::REQ"
65     #define AIO_GRP_KLASS "IO::AIO::GRP"
66 root 1.43
67     typedef struct aio_cb
68     {
69 root 1.44 struct aio_cb *grp, *grp_prev, *grp_next;
70 root 1.43
71 root 1.3 struct aio_cb *volatile next;
72 root 1.1
73 root 1.43 SV *self; /* the perl counterpart of this request, if any */
74 root 1.1
75 root 1.43 SV *data, *callback;
76     SV *fh, *fh2;
77     void *dataptr, *data2ptr;
78     Stat_t *statdata;
79 root 1.1 off_t offset;
80     size_t length;
81     ssize_t result;
82 root 1.43
83     int type;
84     int fd, fd2;
85 root 1.1 int errorno;
86     STRLEN dataoffset;
87 root 1.43 mode_t mode; /* open */
88     unsigned char cancelled;
89 root 1.1 } aio_cb;
90    
91     typedef aio_cb *aio_req;
92 root 1.43 typedef aio_cb *aio_req_ornot;
93 root 1.44 typedef aio_cb *aio_group;
94 root 1.1
95 root 1.30 static int started, wanted;
96 root 1.11 static volatile int nreqs;
97 root 1.4 static int max_outstanding = 1<<30;
98 root 1.3 static int respipe [2];
99 root 1.1
100 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
101     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
102     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
103    
104     static volatile aio_req reqs, reqe; /* queue start, queue end */
105     static volatile aio_req ress, rese; /* queue start, queue end */
106 root 1.1
107 root 1.45 static void req_free (aio_req req);
108    
109 root 1.43 /* must be called at most once */
110 root 1.44 static SV *req_sv (aio_req req, const char *klass)
111 root 1.43 {
112     req->self = (SV *)newHV ();
113     sv_magic (req->self, 0, PERL_MAGIC_ext, (char *)req, 0);
114    
115 root 1.45 return sv_2mortal (sv_bless (newRV_inc (req->self), gv_stashpv (klass, 1)));
116 root 1.43 }
117    
118 root 1.45 static aio_req SvAIO_REQ (SV *sv)
119 root 1.26 {
120 root 1.45 if (!sv_derived_from (sv, AIO_REQ_KLASS) || !SvROK (sv))
121     croak ("object of class " AIO_REQ_KLASS " expected");
122 root 1.43
123     MAGIC *mg = mg_find (SvRV (sv), PERL_MAGIC_ext);
124    
125     return mg ? (aio_req)mg->mg_ptr : 0;
126     }
127    
128 root 1.45 static void poll_wait ()
129     {
130     if (nreqs && !ress)
131     {
132     fd_set rfd;
133     FD_ZERO(&rfd);
134     FD_SET(respipe [0], &rfd);
135    
136     select (respipe [0] + 1, &rfd, 0, 0, 0);
137     }
138     }
139    
140     static void req_invoke (aio_req req)
141     {
142     dSP;
143     int errorno = errno;
144    
145     if (req->cancelled || !SvOK (req->callback))
146     return;
147    
148     errno = req->errorno;
149    
150     ENTER;
151     PUSHMARK (SP);
152    
153     switch (req->type)
154     {
155     case REQ_READDIR:
156     {
157     SV *rv = &PL_sv_undef;
158    
159     if (req->result >= 0)
160     {
161     char *buf = req->data2ptr;
162     AV *av = newAV ();
163    
164     while (req->result)
165     {
166     SV *sv = newSVpv (buf, 0);
167    
168     av_push (av, sv);
169     buf += SvCUR (sv) + 1;
170     req->result--;
171     }
172    
173     rv = sv_2mortal (newRV_noinc ((SV *)av));
174     }
175    
176     XPUSHs (rv);
177     }
178     break;
179    
180     case REQ_OPEN:
181     {
182     /* convert fd to fh */
183     SV *fh;
184    
185     XPUSHs (sv_2mortal (newSViv (req->result)));
186     PUTBACK;
187     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
188     SPAGAIN;
189    
190     fh = SvREFCNT_inc (POPs);
191    
192     PUSHMARK (SP);
193     XPUSHs (sv_2mortal (fh));
194     }
195     break;
196    
197     case REQ_SLEEP:
198     case REQ_GROUP:
199     break;
200    
201     default:
202     XPUSHs (sv_2mortal (newSViv (req->result)));
203     break;
204     }
205    
206    
207     PUTBACK;
208     call_sv (req->callback, G_VOID | G_EVAL);
209     SPAGAIN;
210    
211     if (SvTRUE (ERRSV))
212     {
213     req_free (req);
214     croak (0);
215     }
216    
217     LEAVE;
218    
219     errno = errorno;
220     }
221    
222 root 1.43 static void req_free (aio_req req)
223     {
224 root 1.45 if (req->grp)
225     {
226     aio_req grp = req->grp;
227    
228     /* unlink request */
229     req->grp_next->grp_prev = req->grp_prev;
230     req->grp_prev->grp_next = req->grp_next;
231    
232     if (grp->grp_next == grp)
233     {
234     req_invoke (grp);
235     req_free (grp);
236     }
237     }
238    
239 root 1.43 if (req->self)
240     {
241     sv_unmagic (req->self, PERL_MAGIC_ext);
242     SvREFCNT_dec (req->self);
243     }
244    
245 root 1.26 if (req->data)
246     SvREFCNT_dec (req->data);
247    
248     if (req->fh)
249     SvREFCNT_dec (req->fh);
250    
251 root 1.32 if (req->fh2)
252     SvREFCNT_dec (req->fh2);
253    
254 root 1.27 if (req->statdata)
255 root 1.26 Safefree (req->statdata);
256    
257     if (req->callback)
258     SvREFCNT_dec (req->callback);
259    
260 root 1.37 if (req->type == REQ_READDIR && req->result >= 0)
261     free (req->data2ptr);
262    
263 root 1.26 Safefree (req);
264     }
265    
266 root 1.45 static void req_cancel (aio_req req)
267 root 1.1 {
268 root 1.45 req->cancelled = 1;
269    
270     if (req->type == REQ_GROUP)
271 root 1.11 {
272 root 1.45 aio_req sub;
273 root 1.3
274 root 1.45 for (sub = req->grp_next; sub != req; sub = sub->grp_next)
275     req_cancel (sub);
276 root 1.11 }
277 root 1.1 }
278    
279 root 1.45 static int poll_cb ()
280 root 1.1 {
281     dSP;
282     int count = 0;
283 root 1.24 int do_croak = 0;
284 root 1.27 aio_req req;
285    
286     for (;;)
287     {
288     pthread_mutex_lock (&reslock);
289     req = ress;
290    
291     if (req)
292     {
293     ress = req->next;
294 root 1.11
295 root 1.27 if (!ress)
296     {
297     /* read any signals sent by the worker threads */
298     char buf [32];
299     while (read (respipe [0], buf, 32) == 32)
300     ;
301 root 1.30
302     rese = 0;
303 root 1.27 }
304     }
305 root 1.1
306 root 1.27 pthread_mutex_unlock (&reslock);
307 root 1.3
308 root 1.27 if (!req)
309     break;
310 root 1.3
311 root 1.1 nreqs--;
312    
313     if (req->type == REQ_QUIT)
314     started--;
315 root 1.45 else if (req->type == REQ_GROUP && req->grp_next != req)
316     continue;
317 root 1.1 else
318     {
319     if (req->type == REQ_READ)
320 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
321 root 1.1
322 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
323     SvREADONLY_off (req->data);
324    
325 root 1.27 if (req->statdata)
326 root 1.1 {
327     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
328     PL_laststatval = req->result;
329     PL_statcache = *(req->statdata);
330     }
331    
332 root 1.45 req_invoke (req);
333 root 1.26
334 root 1.1 count++;
335     }
336    
337 root 1.43 req_free (req);
338 root 1.1 }
339    
340     return count;
341     }
342    
343 root 1.4 static void *aio_proc(void *arg);
344    
345 root 1.45 static void start_thread (void)
346 root 1.4 {
347     sigset_t fullsigset, oldsigset;
348     pthread_t tid;
349     pthread_attr_t attr;
350    
351     pthread_attr_init (&attr);
352     pthread_attr_setstacksize (&attr, STACKSIZE);
353 root 1.31 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
354 root 1.4
355     sigfillset (&fullsigset);
356     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
357    
358     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
359     started++;
360    
361     sigprocmask (SIG_SETMASK, &oldsigset, 0);
362     }
363    
364 root 1.45 static void req_send (aio_req req)
365 root 1.4 {
366 root 1.30 while (started < wanted && nreqs >= started)
367     start_thread ();
368    
369 root 1.4 nreqs++;
370    
371     pthread_mutex_lock (&reqlock);
372    
373     req->next = 0;
374    
375     if (reqe)
376     {
377     reqe->next = req;
378     reqe = req;
379     }
380     else
381     reqe = reqs = req;
382    
383     pthread_cond_signal (&reqwait);
384     pthread_mutex_unlock (&reqlock);
385    
386 root 1.27 if (nreqs > max_outstanding)
387     for (;;)
388     {
389     poll_cb ();
390    
391     if (nreqs <= max_outstanding)
392     break;
393    
394     poll_wait ();
395     }
396 root 1.4 }
397    
398 root 1.45 static void end_thread (void)
399 root 1.4 {
400     aio_req req;
401 root 1.26 Newz (0, req, 1, aio_cb);
402 root 1.4 req->type = REQ_QUIT;
403    
404 root 1.43 req_send (req);
405 root 1.4 }
406    
407 root 1.22 static void min_parallel (int nthreads)
408     {
409 root 1.30 if (wanted < nthreads)
410     wanted = nthreads;
411 root 1.22 }
412    
413     static void max_parallel (int nthreads)
414     {
415     int cur = started;
416 root 1.27
417 root 1.30 if (wanted > nthreads)
418     wanted = nthreads;
419    
420     while (cur > wanted)
421     {
422 root 1.22 end_thread ();
423     cur--;
424     }
425    
426 root 1.30 while (started > wanted)
427 root 1.22 {
428     poll_wait ();
429     poll_cb ();
430     }
431     }
432    
433 root 1.26 static void create_pipe ()
434 root 1.22 {
435 root 1.26 if (pipe (respipe))
436     croak ("unable to initialize result pipe");
437 root 1.22
438 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
439     croak ("cannot set result pipe to nonblocking mode");
440 root 1.22
441 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
442     croak ("cannot set result pipe to nonblocking mode");
443     }
444 root 1.22
445     /*****************************************************************************/
446 root 1.17 /* work around various missing functions */
447    
448     #if !HAVE_PREADWRITE
449     # define pread aio_pread
450     # define pwrite aio_pwrite
451    
452     /*
453     * make our pread/pwrite safe against themselves, but not against
454     * normal read/write by using a mutex. slows down execution a lot,
455     * but that's your problem, not mine.
456     */
457 root 1.37 static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER;
458 root 1.17
459 root 1.45 static ssize_t pread (int fd, void *buf, size_t count, off_t offset)
460 root 1.17 {
461     ssize_t res;
462     off_t ooffset;
463    
464 root 1.37 pthread_mutex_lock (&preadwritelock);
465 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
466     lseek (fd, offset, SEEK_SET);
467     res = read (fd, buf, count);
468     lseek (fd, ooffset, SEEK_SET);
469 root 1.37 pthread_mutex_unlock (&preadwritelock);
470 root 1.17
471     return res;
472     }
473    
474 root 1.45 static ssize_t pwrite (int fd, void *buf, size_t count, off_t offset)
475 root 1.17 {
476     ssize_t res;
477     off_t ooffset;
478    
479 root 1.37 pthread_mutex_lock (&preadwritelock);
480 root 1.17 ooffset = lseek (fd, 0, SEEK_CUR);
481     lseek (fd, offset, SEEK_SET);
482     res = write (fd, buf, count);
483     lseek (fd, offset, SEEK_SET);
484 root 1.37 pthread_mutex_unlock (&preadwritelock);
485 root 1.17
486     return res;
487     }
488     #endif
489    
490     #if !HAVE_FDATASYNC
491     # define fdatasync fsync
492     #endif
493    
494     #if !HAVE_READAHEAD
495     # define readahead aio_readahead
496    
497 root 1.45 static ssize_t readahead (int fd, off_t offset, size_t count)
498 root 1.17 {
499 root 1.37 char readahead_buf[4096];
500    
501 root 1.17 while (count > 0)
502     {
503     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
504    
505     pread (fd, readahead_buf, len, offset);
506     offset += len;
507     count -= len;
508     }
509    
510     errno = 0;
511     }
512     #endif
513    
514 root 1.37 #if !HAVE_READDIR_R
515     # define readdir_r aio_readdir_r
516    
517     static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER;
518    
519 root 1.45 static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res)
520 root 1.37 {
521     struct dirent *e;
522     int errorno;
523    
524     pthread_mutex_lock (&readdirlock);
525    
526     e = readdir (dirp);
527     errorno = errno;
528    
529     if (e)
530     {
531     *res = ent;
532     strcpy (ent->d_name, e->d_name);
533     }
534     else
535     *res = 0;
536    
537     pthread_mutex_unlock (&readdirlock);
538    
539     errno = errorno;
540     return e ? 0 : -1;
541     }
542     #endif
543    
544 root 1.32 /* sendfile always needs emulation */
545 root 1.45 static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count)
546 root 1.32 {
547 root 1.37 ssize_t res;
548 root 1.32
549 root 1.37 if (!count)
550     return 0;
551 root 1.32
552 root 1.35 #if HAVE_SENDFILE
553     # if __linux
554 root 1.37 res = sendfile (ofd, ifd, &offset, count);
555 root 1.32
556 root 1.35 # elif __freebsd
557 root 1.37 /*
558     * Of course, the freebsd sendfile is a dire hack with no thoughts
559     * wasted on making it similar to other I/O functions.
560     */
561     {
562     off_t sbytes;
563     res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0);
564    
565     if (res < 0 && sbytes)
566     /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */
567     res = sbytes;
568     }
569 root 1.32
570 root 1.35 # elif __hpux
571 root 1.37 res = sendfile (ofd, ifd, offset, count, 0, 0);
572 root 1.32
573 root 1.35 # elif __solaris
574 root 1.37 {
575     struct sendfilevec vec;
576     size_t sbytes;
577    
578     vec.sfv_fd = ifd;
579     vec.sfv_flag = 0;
580     vec.sfv_off = offset;
581     vec.sfv_len = count;
582    
583     res = sendfilev (ofd, &vec, 1, &sbytes);
584    
585     if (res < 0 && sbytes)
586     res = sbytes;
587     }
588 root 1.35
589 root 1.38 # endif
590     #else
591 root 1.37 res = -1;
592     errno = ENOSYS;
593 root 1.32 #endif
594    
595 root 1.37 if (res < 0
596     && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK
597 root 1.35 #if __solaris
598 root 1.37 || errno == EAFNOSUPPORT || errno == EPROTOTYPE
599 root 1.35 #endif
600 root 1.37 )
601     )
602     {
603     /* emulate sendfile. this is a major pain in the ass */
604     char buf[4096];
605     res = 0;
606    
607 root 1.38 while (count)
608 root 1.37 {
609     ssize_t cnt;
610    
611 root 1.38 cnt = pread (ifd, buf, count > 4096 ? 4096 : count, offset);
612 root 1.32
613 root 1.37 if (cnt <= 0)
614     {
615     if (cnt && !res) res = -1;
616     break;
617     }
618    
619     cnt = write (ofd, buf, cnt);
620    
621     if (cnt <= 0)
622     {
623     if (cnt && !res) res = -1;
624     break;
625     }
626    
627     offset += cnt;
628     res += cnt;
629 root 1.38 count -= cnt;
630 root 1.37 }
631     }
632    
633     return res;
634     }
635    
636     /* read a full directory */
637 root 1.45 static int scandir_ (const char *path, void **namesp)
638 root 1.37 {
639     DIR *dirp = opendir (path);
640     union
641     {
642     struct dirent d;
643     char b [offsetof (struct dirent, d_name) + NAME_MAX + 1];
644     } u;
645     struct dirent *entp;
646     char *name, *names;
647     int memlen = 4096;
648     int memofs = 0;
649     int res = 0;
650     int errorno;
651    
652     if (!dirp)
653     return -1;
654    
655     names = malloc (memlen);
656    
657     for (;;)
658     {
659     errno = 0, readdir_r (dirp, &u.d, &entp);
660    
661     if (!entp)
662     break;
663    
664     name = entp->d_name;
665    
666     if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2])))
667     {
668     int len = strlen (name) + 1;
669    
670     res++;
671    
672     while (memofs + len > memlen)
673     {
674     memlen *= 2;
675     names = realloc (names, memlen);
676     if (!names)
677     break;
678     }
679    
680     memcpy (names + memofs, name, len);
681     memofs += len;
682     }
683     }
684    
685     errorno = errno;
686     closedir (dirp);
687    
688     if (errorno)
689     {
690     free (names);
691     errno = errorno;
692     res = -1;
693     }
694    
695     *namesp = (void *)names;
696     return res;
697 root 1.32 }
698    
699 root 1.22 /*****************************************************************************/
700    
701 root 1.45 static void *aio_proc (void *thr_arg)
702 root 1.1 {
703     aio_req req;
704 root 1.3 int type;
705 root 1.1
706 root 1.3 do
707 root 1.1 {
708 root 1.3 pthread_mutex_lock (&reqlock);
709    
710     for (;;)
711     {
712     req = reqs;
713    
714     if (reqs)
715     {
716     reqs = reqs->next;
717     if (!reqs) reqe = 0;
718     }
719    
720     if (req)
721     break;
722    
723     pthread_cond_wait (&reqwait, &reqlock);
724     }
725    
726     pthread_mutex_unlock (&reqlock);
727    
728 root 1.1 errno = 0; /* strictly unnecessary */
729    
730 root 1.43 if (!req->cancelled)
731     switch (req->type)
732     {
733     case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
734     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
735 root 1.3
736 root 1.43 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
737     case REQ_SENDFILE: req->result = sendfile_ (req->fd, req->fd2, req->offset, req->length); break;
738 root 1.16
739 root 1.43 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
740     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
741     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
742    
743     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
744     case REQ_CLOSE: req->result = close (req->fd); break;
745     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
746     case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
747     case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
748     case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
749     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
750    
751     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
752     case REQ_FSYNC: req->result = fsync (req->fd); break;
753     case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break;
754 root 1.1
755 root 1.45 case REQ_SLEEP:
756     {
757     struct timeval tv;
758    
759     tv.tv_sec = req->fd;
760     tv.tv_usec = req->fd2;
761    
762     req->result = select (0, 0, 0, 0, &tv);
763     }
764    
765 root 1.43 case REQ_QUIT:
766     break;
767 root 1.1
768 root 1.43 default:
769     req->result = ENOSYS;
770     break;
771     }
772 root 1.1
773     req->errorno = errno;
774 root 1.3
775     pthread_mutex_lock (&reslock);
776    
777     req->next = 0;
778    
779     if (rese)
780     {
781     rese->next = req;
782     rese = req;
783     }
784     else
785     {
786     rese = ress = req;
787    
788     /* write a dummy byte to the pipe so fh becomes ready */
789     write (respipe [1], &respipe, 1);
790     }
791    
792     pthread_mutex_unlock (&reslock);
793 root 1.1 }
794 root 1.3 while (type != REQ_QUIT);
795 root 1.1
796     return 0;
797     }
798    
799 root 1.37 /*****************************************************************************/
800    
801     static void atfork_prepare (void)
802     {
803     pthread_mutex_lock (&reqlock);
804     pthread_mutex_lock (&reslock);
805     #if !HAVE_PREADWRITE
806     pthread_mutex_lock (&preadwritelock);
807     #endif
808     #if !HAVE_READDIR_R
809     pthread_mutex_lock (&readdirlock);
810     #endif
811     }
812    
813     static void atfork_parent (void)
814     {
815     #if !HAVE_READDIR_R
816     pthread_mutex_unlock (&readdirlock);
817     #endif
818     #if !HAVE_PREADWRITE
819     pthread_mutex_unlock (&preadwritelock);
820     #endif
821     pthread_mutex_unlock (&reslock);
822     pthread_mutex_unlock (&reqlock);
823     }
824    
825     static void atfork_child (void)
826     {
827     aio_req prv;
828    
829     started = 0;
830    
831     while (reqs)
832     {
833     prv = reqs;
834     reqs = prv->next;
835 root 1.43 req_free (prv);
836 root 1.37 }
837    
838     reqs = reqe = 0;
839    
840     while (ress)
841     {
842     prv = ress;
843     ress = prv->next;
844 root 1.43 req_free (prv);
845 root 1.37 }
846    
847     ress = rese = 0;
848    
849     close (respipe [0]);
850     close (respipe [1]);
851     create_pipe ();
852    
853     atfork_parent ();
854     }
855    
856 root 1.22 #define dREQ \
857     aio_req req; \
858     \
859     if (SvOK (callback) && !SvROK (callback)) \
860 root 1.43 croak ("callback must be undef or of reference type"); \
861 root 1.22 \
862     Newz (0, req, 1, aio_cb); \
863     if (!req) \
864     croak ("out of memory during aio_req allocation"); \
865     \
866 root 1.43 req->callback = newSVsv (callback)
867    
868     #define REQ_SEND \
869     req_send (req); \
870     \
871     if (GIMME_V != G_VOID) \
872 root 1.44 XPUSHs (req_sv (req, AIO_REQ_KLASS));
873 root 1.22
874 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
875    
876 root 1.8 PROTOTYPES: ENABLE
877    
878 root 1.1 BOOT:
879     {
880 root 1.41 HV *stash = gv_stashpv ("IO::AIO", 1);
881     newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
882     newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
883     newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
884    
885 root 1.26 create_pipe ();
886 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
887 root 1.1 }
888    
889     void
890 root 1.40 min_parallel (nthreads)
891 root 1.1 int nthreads
892     PROTOTYPE: $
893    
894     void
895 root 1.40 max_parallel (nthreads)
896 root 1.1 int nthreads
897     PROTOTYPE: $
898    
899 root 1.4 int
900 root 1.40 max_outstanding (nreqs)
901 root 1.4 int nreqs
902     PROTOTYPE: $
903     CODE:
904     RETVAL = max_outstanding;
905     max_outstanding = nreqs;
906    
907 root 1.1 void
908 root 1.40 aio_open (pathname,flags,mode,callback=&PL_sv_undef)
909 root 1.1 SV * pathname
910     int flags
911     int mode
912     SV * callback
913 root 1.8 PROTOTYPE: $$$;$
914 root 1.43 PPCODE:
915 root 1.1 {
916 root 1.22 dREQ;
917 root 1.1
918     req->type = REQ_OPEN;
919     req->data = newSVsv (pathname);
920 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
921 root 1.1 req->fd = flags;
922     req->mode = mode;
923    
924 root 1.43 REQ_SEND;
925 root 1.1 }
926    
927     void
928 root 1.40 aio_close (fh,callback=&PL_sv_undef)
929 root 1.13 SV * fh
930     SV * callback
931 root 1.8 PROTOTYPE: $;$
932 root 1.1 ALIAS:
933     aio_close = REQ_CLOSE
934     aio_fsync = REQ_FSYNC
935     aio_fdatasync = REQ_FDATASYNC
936 root 1.43 PPCODE:
937 root 1.1 {
938 root 1.22 dREQ;
939 root 1.1
940     req->type = ix;
941 root 1.13 req->fh = newSVsv (fh);
942     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
943 root 1.1
944 root 1.43 REQ_SEND (req);
945 root 1.1 }
946    
947     void
948 root 1.40 aio_read (fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
949 root 1.13 SV * fh
950     UV offset
951 root 1.32 UV length
952 root 1.13 SV * data
953 root 1.32 UV dataoffset
954 root 1.13 SV * callback
955     ALIAS:
956     aio_read = REQ_READ
957     aio_write = REQ_WRITE
958 root 1.8 PROTOTYPE: $$$$$;$
959 root 1.43 PPCODE:
960 root 1.13 {
961     aio_req req;
962     STRLEN svlen;
963 root 1.21 char *svptr = SvPVbyte (data, svlen);
964 root 1.13
965     SvUPGRADE (data, SVt_PV);
966     SvPOK_on (data);
967 root 1.1
968 root 1.13 if (dataoffset < 0)
969     dataoffset += svlen;
970    
971     if (dataoffset < 0 || dataoffset > svlen)
972     croak ("data offset outside of string");
973    
974     if (ix == REQ_WRITE)
975     {
976     /* write: check length and adjust. */
977     if (length < 0 || length + dataoffset > svlen)
978     length = svlen - dataoffset;
979     }
980     else
981     {
982     /* read: grow scalar as necessary */
983     svptr = SvGROW (data, length + dataoffset);
984     }
985    
986     if (length < 0)
987     croak ("length must not be negative");
988    
989 root 1.22 {
990     dREQ;
991 root 1.13
992 root 1.22 req->type = ix;
993     req->fh = newSVsv (fh);
994     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
995     : IoOFP (sv_2io (fh)));
996     req->offset = offset;
997     req->length = length;
998     req->data = SvREFCNT_inc (data);
999     req->dataptr = (char *)svptr + dataoffset;
1000 root 1.13
1001 root 1.28 if (!SvREADONLY (data))
1002     {
1003     SvREADONLY_on (data);
1004     req->data2ptr = (void *)data;
1005     }
1006    
1007 root 1.43 REQ_SEND;
1008 root 1.22 }
1009 root 1.13 }
1010 root 1.1
1011     void
1012 root 1.40 aio_sendfile (out_fh,in_fh,in_offset,length,callback=&PL_sv_undef)
1013 root 1.32 SV * out_fh
1014     SV * in_fh
1015     UV in_offset
1016     UV length
1017     SV * callback
1018     PROTOTYPE: $$$$;$
1019 root 1.43 PPCODE:
1020 root 1.32 {
1021     dREQ;
1022    
1023     req->type = REQ_SENDFILE;
1024     req->fh = newSVsv (out_fh);
1025     req->fd = PerlIO_fileno (IoIFP (sv_2io (out_fh)));
1026     req->fh2 = newSVsv (in_fh);
1027     req->fd2 = PerlIO_fileno (IoIFP (sv_2io (in_fh)));
1028     req->offset = in_offset;
1029     req->length = length;
1030    
1031 root 1.43 REQ_SEND;
1032 root 1.32 }
1033    
1034     void
1035 root 1.40 aio_readahead (fh,offset,length,callback=&PL_sv_undef)
1036 root 1.13 SV * fh
1037     UV offset
1038     IV length
1039     SV * callback
1040 root 1.8 PROTOTYPE: $$$;$
1041 root 1.43 PPCODE:
1042 root 1.1 {
1043 root 1.22 dREQ;
1044 root 1.1
1045     req->type = REQ_READAHEAD;
1046 root 1.13 req->fh = newSVsv (fh);
1047     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
1048 root 1.1 req->offset = offset;
1049     req->length = length;
1050    
1051 root 1.43 REQ_SEND;
1052 root 1.1 }
1053    
1054     void
1055 root 1.40 aio_stat (fh_or_path,callback=&PL_sv_undef)
1056 root 1.1 SV * fh_or_path
1057     SV * callback
1058     ALIAS:
1059 root 1.8 aio_stat = REQ_STAT
1060     aio_lstat = REQ_LSTAT
1061 root 1.43 PPCODE:
1062 root 1.1 {
1063 root 1.22 dREQ;
1064 root 1.1
1065     New (0, req->statdata, 1, Stat_t);
1066     if (!req->statdata)
1067 root 1.27 {
1068 root 1.43 req_free (req);
1069 root 1.27 croak ("out of memory during aio_req->statdata allocation");
1070     }
1071 root 1.1
1072     if (SvPOK (fh_or_path))
1073     {
1074 root 1.8 req->type = ix;
1075 root 1.1 req->data = newSVsv (fh_or_path);
1076 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
1077 root 1.1 }
1078     else
1079     {
1080     req->type = REQ_FSTAT;
1081 root 1.13 req->fh = newSVsv (fh_or_path);
1082 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
1083     }
1084    
1085 root 1.43 REQ_SEND;
1086 root 1.1 }
1087    
1088     void
1089 root 1.40 aio_unlink (pathname,callback=&PL_sv_undef)
1090 root 1.1 SV * pathname
1091     SV * callback
1092 root 1.22 ALIAS:
1093 root 1.40 aio_unlink = REQ_UNLINK
1094     aio_rmdir = REQ_RMDIR
1095     aio_readdir = REQ_READDIR
1096 root 1.43 PPCODE:
1097 root 1.1 {
1098 root 1.22 dREQ;
1099 root 1.1
1100 root 1.22 req->type = ix;
1101     req->data = newSVsv (pathname);
1102     req->dataptr = SvPVbyte_nolen (req->data);
1103 root 1.1
1104 root 1.43 REQ_SEND;
1105 root 1.22 }
1106    
1107     void
1108 root 1.40 aio_link (oldpath,newpath,callback=&PL_sv_undef)
1109 root 1.22 SV * oldpath
1110     SV * newpath
1111     SV * callback
1112 root 1.40 ALIAS:
1113     aio_link = REQ_LINK
1114     aio_symlink = REQ_SYMLINK
1115     aio_rename = REQ_RENAME
1116 root 1.43 PPCODE:
1117 root 1.22 {
1118     dREQ;
1119 root 1.1
1120 root 1.40 req->type = ix;
1121 root 1.22 req->fh = newSVsv (oldpath);
1122     req->data2ptr = SvPVbyte_nolen (req->fh);
1123     req->data = newSVsv (newpath);
1124     req->dataptr = SvPVbyte_nolen (req->data);
1125 root 1.1
1126 root 1.43 REQ_SEND;
1127 root 1.1 }
1128    
1129 root 1.42 void
1130 root 1.45 aio_sleep (delay,callback=&PL_sv_undef)
1131     double delay
1132     SV * callback
1133     PPCODE:
1134     {
1135     dREQ;
1136    
1137     req->type = REQ_SLEEP;
1138     req->fd = delay < 0. ? 0 : delay;
1139     req->fd2 = delay < 0. ? 0 : 1000. * (delay - req->fd);
1140    
1141     REQ_SEND;
1142     }
1143    
1144     void
1145 root 1.44 aio_group (callback=&PL_sv_undef)
1146     SV * callback
1147     PROTOTYPE: ;&
1148     PPCODE:
1149 root 1.42 {
1150 root 1.44 dREQ;
1151     req->type = REQ_GROUP;
1152 root 1.45 req->grp_next = req;
1153     req->grp_prev = req;
1154    
1155     req_send (req);
1156 root 1.44 XPUSHs (req_sv (req, AIO_GRP_KLASS));
1157 root 1.42 }
1158    
1159 root 1.6 void
1160 root 1.40 flush ()
1161 root 1.6 PROTOTYPE:
1162     CODE:
1163     while (nreqs)
1164     {
1165     poll_wait ();
1166     poll_cb ();
1167     }
1168    
1169 root 1.7 void
1170     poll()
1171     PROTOTYPE:
1172     CODE:
1173     if (nreqs)
1174     {
1175     poll_wait ();
1176     poll_cb ();
1177     }
1178    
1179 root 1.1 int
1180     poll_fileno()
1181     PROTOTYPE:
1182     CODE:
1183 root 1.3 RETVAL = respipe [0];
1184 root 1.1 OUTPUT:
1185     RETVAL
1186    
1187     int
1188     poll_cb(...)
1189     PROTOTYPE:
1190     CODE:
1191 root 1.5 RETVAL = poll_cb ();
1192 root 1.1 OUTPUT:
1193     RETVAL
1194    
1195     void
1196     poll_wait()
1197     PROTOTYPE:
1198     CODE:
1199 root 1.3 if (nreqs)
1200     poll_wait ();
1201 root 1.1
1202     int
1203     nreqs()
1204     PROTOTYPE:
1205     CODE:
1206     RETVAL = nreqs;
1207     OUTPUT:
1208     RETVAL
1209    
1210 root 1.44 MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1211 root 1.43
1212     void
1213     cancel (aio_req_ornot req)
1214     PROTOTYPE:
1215     CODE:
1216 root 1.45 req_cancel (req);
1217    
1218     MODULE = IO::AIO PACKAGE = IO::AIO::GRP
1219    
1220     void
1221     add (aio_req grp, ...)
1222     PROTOTYPE: $;@
1223     PPCODE:
1224     {
1225     int i;
1226    
1227     for (i = 1; i < items; ++i )
1228     {
1229     aio_req req = SvAIO_REQ (ST (i));
1230    
1231     req->grp_prev = grp;
1232     req->grp_next = grp->grp_next;
1233     grp->grp_next->grp_prev = req;
1234     grp->grp_next = req;
1235    
1236     req->grp = grp;
1237    
1238     if (GIMME_V != G_VOID)
1239     XPUSHs (sv_2mortal (newSVsv (ST (i))));
1240     }
1241     }
1242 root 1.43