ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.27
Committed: Wed Aug 17 04:47:02 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.26: +70 -59 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.1 #include <sys/types.h>
11     #include <sys/stat.h>
12 root 1.10
13 root 1.1 #include <unistd.h>
14     #include <fcntl.h>
15     #include <signal.h>
16     #include <sched.h>
17    
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
36 root 1.23 REQ_UNLINK, REQ_RMDIR,
37 root 1.22 REQ_SYMLINK,
38 root 1.1 };
39    
40     typedef struct aio_cb {
41 root 1.3 struct aio_cb *volatile next;
42 root 1.1
43     int type;
44    
45     int fd;
46     off_t offset;
47     size_t length;
48     ssize_t result;
49     mode_t mode; /* open */
50     int errorno;
51 root 1.13 SV *data, *callback, *fh;
52 root 1.22 void *dataptr, *data2ptr;
53 root 1.1 STRLEN dataoffset;
54    
55     Stat_t *statdata;
56     } aio_cb;
57    
58     typedef aio_cb *aio_req;
59    
60     static int started;
61 root 1.11 static volatile int nreqs;
62 root 1.4 static int max_outstanding = 1<<30;
63 root 1.3 static int respipe [2];
64 root 1.1
65 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
68    
69     static volatile aio_req reqs, reqe; /* queue start, queue end */
70     static volatile aio_req ress, rese; /* queue start, queue end */
71 root 1.1
72 root 1.26 static void free_req (aio_req req)
73     {
74     if (req->data)
75     SvREFCNT_dec (req->data);
76    
77     if (req->fh)
78     SvREFCNT_dec (req->fh);
79    
80 root 1.27 if (req->statdata)
81 root 1.26 Safefree (req->statdata);
82    
83     if (req->callback)
84     SvREFCNT_dec (req->callback);
85    
86     Safefree (req);
87     }
88    
89 root 1.1 static void
90     poll_wait ()
91     {
92 root 1.11 if (nreqs && !ress)
93     {
94     fd_set rfd;
95     FD_ZERO(&rfd);
96     FD_SET(respipe [0], &rfd);
97 root 1.3
98 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
99     }
100 root 1.1 }
101    
102     static int
103 root 1.5 poll_cb ()
104 root 1.1 {
105     dSP;
106     int count = 0;
107 root 1.24 int do_croak = 0;
108 root 1.27 aio_req req;
109    
110     for (;;)
111     {
112     pthread_mutex_lock (&reslock);
113     req = ress;
114    
115     if (req)
116     {
117     ress = req->next;
118 root 1.11
119 root 1.27 if (!ress)
120     {
121     rese = 0;
122 root 1.11
123 root 1.27 /* read any signals sent by the worker threads */
124     char buf [32];
125     while (read (respipe [0], buf, 32) == 32)
126     ;
127     }
128     }
129 root 1.1
130 root 1.27 pthread_mutex_unlock (&reslock);
131 root 1.3
132 root 1.27 if (!req)
133     break;
134 root 1.3
135 root 1.1 nreqs--;
136    
137     if (req->type == REQ_QUIT)
138     started--;
139     else
140     {
141     int errorno = errno;
142     errno = req->errorno;
143    
144     if (req->type == REQ_READ)
145 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
146 root 1.1
147 root 1.27 if (req->statdata)
148 root 1.1 {
149     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
150     PL_laststatval = req->result;
151     PL_statcache = *(req->statdata);
152     }
153    
154 root 1.13 ENTER;
155 root 1.1 PUSHMARK (SP);
156     XPUSHs (sv_2mortal (newSViv (req->result)));
157 root 1.2
158     if (req->type == REQ_OPEN)
159     {
160     /* convert fd to fh */
161     SV *fh;
162    
163     PUTBACK;
164     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
165     SPAGAIN;
166    
167 root 1.13 fh = SvREFCNT_inc (POPs);
168 root 1.2
169     PUSHMARK (SP);
170 root 1.13 XPUSHs (sv_2mortal (fh));
171 root 1.2 }
172    
173 root 1.8 if (SvOK (req->callback))
174     {
175     PUTBACK;
176     call_sv (req->callback, G_VOID | G_EVAL);
177     SPAGAIN;
178 root 1.27
179     if (SvTRUE (ERRSV))
180     {
181     free_req (req);
182     croak (0);
183     }
184 root 1.8 }
185 root 1.13
186 root 1.26 LEAVE;
187    
188 root 1.1 errno = errorno;
189     count++;
190     }
191    
192 root 1.27 free_req (req);
193 root 1.1 }
194    
195     return count;
196     }
197    
198 root 1.4 static void *aio_proc(void *arg);
199    
200     static void
201     start_thread (void)
202     {
203     sigset_t fullsigset, oldsigset;
204     pthread_t tid;
205     pthread_attr_t attr;
206    
207     pthread_attr_init (&attr);
208     pthread_attr_setstacksize (&attr, STACKSIZE);
209     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
210    
211     sigfillset (&fullsigset);
212     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
213    
214     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
215     started++;
216    
217     sigprocmask (SIG_SETMASK, &oldsigset, 0);
218     }
219    
220     static void
221     send_req (aio_req req)
222     {
223     nreqs++;
224    
225     pthread_mutex_lock (&reqlock);
226    
227     req->next = 0;
228    
229     if (reqe)
230     {
231     reqe->next = req;
232     reqe = req;
233     }
234     else
235     reqe = reqs = req;
236    
237     pthread_cond_signal (&reqwait);
238     pthread_mutex_unlock (&reqlock);
239    
240 root 1.27 if (nreqs > max_outstanding)
241     for (;;)
242     {
243     poll_cb ();
244    
245     if (nreqs <= max_outstanding)
246     break;
247    
248     poll_wait ();
249     }
250 root 1.4 }
251    
252     static void
253     end_thread (void)
254     {
255     aio_req req;
256 root 1.26 Newz (0, req, 1, aio_cb);
257 root 1.4 req->type = REQ_QUIT;
258    
259     send_req (req);
260     }
261    
262 root 1.22 static void min_parallel (int nthreads)
263     {
264     while (nthreads > started)
265     start_thread ();
266     }
267    
268     static void max_parallel (int nthreads)
269     {
270     int cur = started;
271 root 1.27
272 root 1.22 while (cur > nthreads)
273     {
274     end_thread ();
275     cur--;
276     }
277    
278     while (started > nthreads)
279     {
280     poll_wait ();
281     poll_cb ();
282     }
283     }
284    
285 root 1.26 static void create_pipe ()
286 root 1.22 {
287 root 1.26 if (pipe (respipe))
288     croak ("unable to initialize result pipe");
289 root 1.22
290 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
291     croak ("cannot set result pipe to nonblocking mode");
292 root 1.22
293 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
294     croak ("cannot set result pipe to nonblocking mode");
295     }
296 root 1.22
297 root 1.26 static void atfork_prepare (void)
298     {
299 root 1.27 for (;;) {
300    
301     for (;;)
302     {
303     poll_cb ();
304    
305     if (!nreqs)
306     break;
307    
308     poll_wait ();
309     }
310    
311     pthread_mutex_lock (&reqlock);
312    
313     if (!nreqs)
314     break;
315    
316     pthread_mutex_unlock (&reqlock);
317     }
318    
319 root 1.22 pthread_mutex_lock (&reslock);
320 root 1.27
321     assert (!nreqs && !reqs && !ress);
322 root 1.22 }
323    
324     static void atfork_parent (void)
325     {
326     pthread_mutex_unlock (&reslock);
327 root 1.26 pthread_mutex_unlock (&reqlock);
328     }
329    
330     static void atfork_child (void)
331     {
332     int restart = started;
333     started = 0;
334    
335     atfork_parent ();
336 root 1.25
337 root 1.26 min_parallel (restart);
338 root 1.22 }
339    
340     /*****************************************************************************/
341 root 1.17 /* work around various missing functions */
342    
343     #if !HAVE_PREADWRITE
344     # define pread aio_pread
345     # define pwrite aio_pwrite
346    
347     /*
348     * make our pread/pwrite safe against themselves, but not against
349     * normal read/write by using a mutex. slows down execution a lot,
350     * but that's your problem, not mine.
351     */
352     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
353    
354     static ssize_t
355     pread (int fd, void *buf, size_t count, off_t offset)
356     {
357     ssize_t res;
358     off_t ooffset;
359    
360     pthread_mutex_lock (&iolock);
361     ooffset = lseek (fd, 0, SEEK_CUR);
362     lseek (fd, offset, SEEK_SET);
363     res = read (fd, buf, count);
364     lseek (fd, ooffset, SEEK_SET);
365     pthread_mutex_unlock (&iolock);
366    
367     return res;
368     }
369    
370     static ssize_t
371     pwrite (int fd, void *buf, size_t count, off_t offset)
372     {
373     ssize_t res;
374     off_t ooffset;
375    
376     pthread_mutex_lock (&iolock);
377     ooffset = lseek (fd, 0, SEEK_CUR);
378     lseek (fd, offset, SEEK_SET);
379     res = write (fd, buf, count);
380     lseek (fd, offset, SEEK_SET);
381     pthread_mutex_unlock (&iolock);
382    
383     return res;
384     }
385     #endif
386    
387     #if !HAVE_FDATASYNC
388     # define fdatasync fsync
389     #endif
390    
391     #if !HAVE_READAHEAD
392     # define readahead aio_readahead
393    
394     static char readahead_buf[4096];
395    
396     static ssize_t
397     readahead (int fd, off_t offset, size_t count)
398     {
399     while (count > 0)
400     {
401     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
402    
403     pread (fd, readahead_buf, len, offset);
404     offset += len;
405     count -= len;
406     }
407    
408     errno = 0;
409     }
410     #endif
411    
412 root 1.22 /*****************************************************************************/
413    
414 root 1.1 static void *
415     aio_proc (void *thr_arg)
416     {
417     aio_req req;
418 root 1.3 int type;
419 root 1.1
420 root 1.3 do
421 root 1.1 {
422 root 1.3 pthread_mutex_lock (&reqlock);
423    
424     for (;;)
425     {
426     req = reqs;
427    
428     if (reqs)
429     {
430     reqs = reqs->next;
431     if (!reqs) reqe = 0;
432     }
433    
434     if (req)
435     break;
436    
437     pthread_cond_wait (&reqwait, &reqlock);
438     }
439    
440     pthread_mutex_unlock (&reqlock);
441    
442 root 1.1 errno = 0; /* strictly unnecessary */
443    
444 root 1.3 type = req->type;
445    
446     switch (type)
447 root 1.1 {
448 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
449     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
450 root 1.16
451 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
452    
453     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
454     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
455     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
456    
457     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
458     case REQ_CLOSE: req->result = close (req->fd); break;
459     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
460 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
461     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
462 root 1.1
463 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
464 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
465    
466     case REQ_QUIT:
467 root 1.3 break;
468 root 1.1
469     default:
470     req->result = ENOSYS;
471     break;
472     }
473    
474     req->errorno = errno;
475 root 1.3
476     pthread_mutex_lock (&reslock);
477    
478     req->next = 0;
479    
480     if (rese)
481     {
482     rese->next = req;
483     rese = req;
484     }
485     else
486     {
487     rese = ress = req;
488    
489     /* write a dummy byte to the pipe so fh becomes ready */
490     write (respipe [1], &respipe, 1);
491     }
492    
493     pthread_mutex_unlock (&reslock);
494 root 1.1 }
495 root 1.3 while (type != REQ_QUIT);
496 root 1.1
497     return 0;
498     }
499    
500 root 1.22 #define dREQ \
501     aio_req req; \
502     \
503     if (SvOK (callback) && !SvROK (callback)) \
504     croak ("clalback must be undef or of reference type"); \
505     \
506     Newz (0, req, 1, aio_cb); \
507     if (!req) \
508     croak ("out of memory during aio_req allocation"); \
509     \
510 root 1.27 req->callback = newSVsv (callback);
511 root 1.22
512 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
513    
514 root 1.8 PROTOTYPES: ENABLE
515    
516 root 1.1 BOOT:
517     {
518 root 1.26 create_pipe ();
519 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
520 root 1.1 }
521    
522     void
523     min_parallel(nthreads)
524     int nthreads
525     PROTOTYPE: $
526    
527     void
528     max_parallel(nthreads)
529     int nthreads
530     PROTOTYPE: $
531    
532 root 1.4 int
533     max_outstanding(nreqs)
534     int nreqs
535     PROTOTYPE: $
536     CODE:
537     RETVAL = max_outstanding;
538     max_outstanding = nreqs;
539    
540 root 1.1 void
541 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
542 root 1.1 SV * pathname
543     int flags
544     int mode
545     SV * callback
546 root 1.8 PROTOTYPE: $$$;$
547 root 1.1 CODE:
548     {
549 root 1.22 dREQ;
550 root 1.1
551     req->type = REQ_OPEN;
552     req->data = newSVsv (pathname);
553 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
554 root 1.1 req->fd = flags;
555     req->mode = mode;
556    
557     send_req (req);
558     }
559    
560     void
561 root 1.8 aio_close(fh,callback=&PL_sv_undef)
562 root 1.13 SV * fh
563     SV * callback
564 root 1.8 PROTOTYPE: $;$
565 root 1.1 ALIAS:
566     aio_close = REQ_CLOSE
567     aio_fsync = REQ_FSYNC
568     aio_fdatasync = REQ_FDATASYNC
569     CODE:
570     {
571 root 1.22 dREQ;
572 root 1.1
573     req->type = ix;
574 root 1.13 req->fh = newSVsv (fh);
575     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
576 root 1.1
577     send_req (req);
578     }
579    
580     void
581 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
582 root 1.13 SV * fh
583     UV offset
584     IV length
585     SV * data
586     IV dataoffset
587     SV * callback
588     ALIAS:
589     aio_read = REQ_READ
590     aio_write = REQ_WRITE
591 root 1.8 PROTOTYPE: $$$$$;$
592 root 1.1 CODE:
593 root 1.13 {
594     aio_req req;
595     STRLEN svlen;
596 root 1.21 char *svptr = SvPVbyte (data, svlen);
597 root 1.13
598     SvUPGRADE (data, SVt_PV);
599     SvPOK_on (data);
600 root 1.1
601 root 1.13 if (dataoffset < 0)
602     dataoffset += svlen;
603    
604     if (dataoffset < 0 || dataoffset > svlen)
605     croak ("data offset outside of string");
606    
607     if (ix == REQ_WRITE)
608     {
609     /* write: check length and adjust. */
610     if (length < 0 || length + dataoffset > svlen)
611     length = svlen - dataoffset;
612     }
613     else
614     {
615     /* read: grow scalar as necessary */
616     svptr = SvGROW (data, length + dataoffset);
617     }
618    
619     if (length < 0)
620     croak ("length must not be negative");
621    
622 root 1.22 {
623     dREQ;
624 root 1.13
625 root 1.22 req->type = ix;
626     req->fh = newSVsv (fh);
627     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
628     : IoOFP (sv_2io (fh)));
629     req->offset = offset;
630     req->length = length;
631     req->data = SvREFCNT_inc (data);
632     req->dataptr = (char *)svptr + dataoffset;
633 root 1.13
634 root 1.22 send_req (req);
635     }
636 root 1.13 }
637 root 1.1
638     void
639 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
640 root 1.13 SV * fh
641     UV offset
642     IV length
643     SV * callback
644 root 1.8 PROTOTYPE: $$$;$
645 root 1.1 CODE:
646     {
647 root 1.22 dREQ;
648 root 1.1
649     req->type = REQ_READAHEAD;
650 root 1.13 req->fh = newSVsv (fh);
651     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
652 root 1.1 req->offset = offset;
653     req->length = length;
654    
655     send_req (req);
656     }
657    
658     void
659 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
660 root 1.1 SV * fh_or_path
661     SV * callback
662     ALIAS:
663 root 1.8 aio_stat = REQ_STAT
664     aio_lstat = REQ_LSTAT
665 root 1.1 CODE:
666     {
667 root 1.22 dREQ;
668 root 1.1
669     New (0, req->statdata, 1, Stat_t);
670     if (!req->statdata)
671 root 1.27 {
672     free_req (req);
673     croak ("out of memory during aio_req->statdata allocation");
674     }
675 root 1.1
676     if (SvPOK (fh_or_path))
677     {
678 root 1.8 req->type = ix;
679 root 1.1 req->data = newSVsv (fh_or_path);
680 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
681 root 1.1 }
682     else
683     {
684     req->type = REQ_FSTAT;
685 root 1.13 req->fh = newSVsv (fh_or_path);
686 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
687     }
688    
689     send_req (req);
690     }
691    
692     void
693 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
694 root 1.1 SV * pathname
695     SV * callback
696 root 1.22 ALIAS:
697     aio_unlink = REQ_UNLINK
698     aio_rmdir = REQ_RMDIR
699 root 1.1 CODE:
700     {
701 root 1.22 dREQ;
702 root 1.1
703 root 1.22 req->type = ix;
704     req->data = newSVsv (pathname);
705     req->dataptr = SvPVbyte_nolen (req->data);
706 root 1.1
707 root 1.22 send_req (req);
708     }
709    
710     void
711     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
712     SV * oldpath
713     SV * newpath
714     SV * callback
715     CODE:
716     {
717     dREQ;
718 root 1.1
719 root 1.22 req->type = REQ_SYMLINK;
720     req->fh = newSVsv (oldpath);
721     req->data2ptr = SvPVbyte_nolen (req->fh);
722     req->data = newSVsv (newpath);
723     req->dataptr = SvPVbyte_nolen (req->data);
724 root 1.1
725     send_req (req);
726     }
727    
728 root 1.6 void
729     flush()
730     PROTOTYPE:
731     CODE:
732     while (nreqs)
733     {
734     poll_wait ();
735     poll_cb ();
736     }
737    
738 root 1.7 void
739     poll()
740     PROTOTYPE:
741     CODE:
742     if (nreqs)
743     {
744     poll_wait ();
745     poll_cb ();
746     }
747    
748 root 1.1 int
749     poll_fileno()
750     PROTOTYPE:
751     CODE:
752 root 1.3 RETVAL = respipe [0];
753 root 1.1 OUTPUT:
754     RETVAL
755    
756     int
757     poll_cb(...)
758     PROTOTYPE:
759     CODE:
760 root 1.5 RETVAL = poll_cb ();
761 root 1.1 OUTPUT:
762     RETVAL
763    
764     void
765     poll_wait()
766     PROTOTYPE:
767     CODE:
768 root 1.3 if (nreqs)
769     poll_wait ();
770 root 1.1
771     int
772     nreqs()
773     PROTOTYPE:
774     CODE:
775     RETVAL = nreqs;
776     OUTPUT:
777     RETVAL
778