ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.26
Committed: Wed Aug 17 03:52:20 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.25: +64 -56 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.1 #include <sys/types.h>
11     #include <sys/stat.h>
12 root 1.10
13 root 1.1 #include <unistd.h>
14     #include <fcntl.h>
15     #include <signal.h>
16     #include <sched.h>
17    
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
36 root 1.23 REQ_UNLINK, REQ_RMDIR,
37 root 1.22 REQ_SYMLINK,
38 root 1.1 };
39    
40     typedef struct aio_cb {
41 root 1.3 struct aio_cb *volatile next;
42 root 1.1
43     int type;
44    
45     int fd;
46     off_t offset;
47     size_t length;
48     ssize_t result;
49     mode_t mode; /* open */
50     int errorno;
51 root 1.13 SV *data, *callback, *fh;
52 root 1.22 void *dataptr, *data2ptr;
53 root 1.1 STRLEN dataoffset;
54    
55     Stat_t *statdata;
56     } aio_cb;
57    
58     typedef aio_cb *aio_req;
59    
60     static int started;
61 root 1.11 static volatile int nreqs;
62 root 1.4 static int max_outstanding = 1<<30;
63 root 1.3 static int respipe [2];
64 root 1.1
65 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67 root 1.22 static pthread_mutex_t frklock = PTHREAD_MUTEX_INITIALIZER;
68 root 1.3 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69    
70     static volatile aio_req reqs, reqe; /* queue start, queue end */
71     static volatile aio_req ress, rese; /* queue start, queue end */
72 root 1.1
73 root 1.26 static void free_req (aio_req req)
74     {
75     if (req->data)
76     SvREFCNT_dec (req->data);
77    
78     if (req->fh)
79     SvREFCNT_dec (req->fh);
80    
81     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
82     Safefree (req->statdata);
83    
84     if (req->callback)
85     SvREFCNT_dec (req->callback);
86    
87     Safefree (req);
88     }
89    
90 root 1.1 static void
91     poll_wait ()
92     {
93 root 1.11 if (nreqs && !ress)
94     {
95     fd_set rfd;
96     FD_ZERO(&rfd);
97     FD_SET(respipe [0], &rfd);
98 root 1.3
99 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
100     }
101 root 1.1 }
102    
103     static int
104 root 1.5 poll_cb ()
105 root 1.1 {
106     dSP;
107     int count = 0;
108 root 1.24 int do_croak = 0;
109 root 1.11 aio_req req, prv;
110    
111     pthread_mutex_lock (&reslock);
112    
113 root 1.3 {
114 root 1.11 /* read any signals sent by the worker threads */
115 root 1.3 char buf [32];
116 root 1.22 while (read (respipe [0], buf, 32) == 32)
117 root 1.3 ;
118     }
119 root 1.1
120 root 1.11 req = ress;
121     ress = rese = 0;
122 root 1.3
123 root 1.11 pthread_mutex_unlock (&reslock);
124 root 1.3
125 root 1.11 while (req)
126     {
127 root 1.1 nreqs--;
128    
129     if (req->type == REQ_QUIT)
130     started--;
131     else
132     {
133     int errorno = errno;
134     errno = req->errorno;
135    
136     if (req->type == REQ_READ)
137     SvCUR_set (req->data, req->dataoffset
138     + req->result > 0 ? req->result : 0);
139    
140     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
141     {
142     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
143     PL_laststatval = req->result;
144     PL_statcache = *(req->statdata);
145     }
146    
147 root 1.13 ENTER;
148 root 1.1 PUSHMARK (SP);
149     XPUSHs (sv_2mortal (newSViv (req->result)));
150 root 1.2
151     if (req->type == REQ_OPEN)
152     {
153     /* convert fd to fh */
154     SV *fh;
155    
156     PUTBACK;
157     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
158     SPAGAIN;
159    
160 root 1.13 fh = SvREFCNT_inc (POPs);
161 root 1.2
162     PUSHMARK (SP);
163 root 1.13 XPUSHs (sv_2mortal (fh));
164 root 1.2 }
165    
166 root 1.8 if (SvOK (req->callback))
167     {
168     PUTBACK;
169     call_sv (req->callback, G_VOID | G_EVAL);
170     SPAGAIN;
171     }
172 root 1.13
173 root 1.26 LEAVE;
174    
175 root 1.24 do_croak = SvTRUE (ERRSV);
176 root 1.1
177     errno = errorno;
178     count++;
179     }
180    
181 root 1.11 prv = req;
182     req = req->next;
183 root 1.26 free_req (prv);
184 root 1.11
185 root 1.24 if (do_croak)
186     croak (0);
187 root 1.1 }
188    
189     return count;
190     }
191    
192 root 1.4 static void *aio_proc(void *arg);
193    
194     static void
195     start_thread (void)
196     {
197     sigset_t fullsigset, oldsigset;
198     pthread_t tid;
199     pthread_attr_t attr;
200    
201     pthread_attr_init (&attr);
202     pthread_attr_setstacksize (&attr, STACKSIZE);
203     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
204    
205     sigfillset (&fullsigset);
206     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
207    
208     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
209     started++;
210    
211     sigprocmask (SIG_SETMASK, &oldsigset, 0);
212     }
213    
214     static void
215     send_req (aio_req req)
216     {
217     nreqs++;
218    
219     pthread_mutex_lock (&reqlock);
220    
221     req->next = 0;
222    
223     if (reqe)
224     {
225     reqe->next = req;
226     reqe = req;
227     }
228     else
229     reqe = reqs = req;
230    
231     pthread_cond_signal (&reqwait);
232     pthread_mutex_unlock (&reqlock);
233    
234     while (nreqs > max_outstanding)
235     {
236     poll_wait ();
237     poll_cb ();
238     }
239     }
240    
241     static void
242     end_thread (void)
243     {
244     aio_req req;
245 root 1.26 Newz (0, req, 1, aio_cb);
246 root 1.4 req->type = REQ_QUIT;
247    
248     send_req (req);
249     }
250    
251 root 1.22
252     static void min_parallel (int nthreads)
253     {
254     while (nthreads > started)
255     start_thread ();
256     }
257    
258     static void max_parallel (int nthreads)
259     {
260     int cur = started;
261     while (cur > nthreads)
262     {
263     end_thread ();
264     cur--;
265     }
266    
267     while (started > nthreads)
268     {
269     poll_wait ();
270     poll_cb ();
271     }
272     }
273    
274 root 1.26 static void create_pipe ()
275 root 1.22 {
276 root 1.26 if (pipe (respipe))
277     croak ("unable to initialize result pipe");
278 root 1.22
279 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
280     croak ("cannot set result pipe to nonblocking mode");
281 root 1.22
282 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
283     croak ("cannot set result pipe to nonblocking mode");
284     }
285 root 1.22
286 root 1.26 static void atfork_prepare (void)
287     {
288 root 1.25 pthread_mutex_lock (&frklock);
289 root 1.26 pthread_mutex_lock (&reqlock);
290 root 1.22 pthread_mutex_lock (&reslock);
291     }
292    
293     static void atfork_parent (void)
294     {
295     pthread_mutex_unlock (&reslock);
296 root 1.26 pthread_mutex_unlock (&reqlock);
297 root 1.25 pthread_mutex_unlock (&frklock);
298 root 1.26 }
299    
300     static void atfork_child (void)
301     {
302     int restart = started;
303     started = 0;
304    
305     while (reqs)
306     {
307     free_req (reqs);
308     reqs = reqs->next;
309     }
310    
311     reqs = reqe = 0;
312    
313     while (ress)
314     {
315     free_req (ress);
316     ress = ress->next;
317     }
318    
319     ress = rese = 0;
320    
321     close (respipe [0]);
322     close (respipe [1]);
323    
324     create_pipe ();
325    
326     atfork_parent ();
327 root 1.25
328 root 1.26 min_parallel (restart);
329 root 1.22 }
330    
331     /*****************************************************************************/
332 root 1.17 /* work around various missing functions */
333    
334     #if !HAVE_PREADWRITE
335     # define pread aio_pread
336     # define pwrite aio_pwrite
337    
338     /*
339     * make our pread/pwrite safe against themselves, but not against
340     * normal read/write by using a mutex. slows down execution a lot,
341     * but that's your problem, not mine.
342     */
343     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
344    
345     static ssize_t
346     pread (int fd, void *buf, size_t count, off_t offset)
347     {
348     ssize_t res;
349     off_t ooffset;
350    
351     pthread_mutex_lock (&iolock);
352     ooffset = lseek (fd, 0, SEEK_CUR);
353     lseek (fd, offset, SEEK_SET);
354     res = read (fd, buf, count);
355     lseek (fd, ooffset, SEEK_SET);
356     pthread_mutex_unlock (&iolock);
357    
358     return res;
359     }
360    
361     static ssize_t
362     pwrite (int fd, void *buf, size_t count, off_t offset)
363     {
364     ssize_t res;
365     off_t ooffset;
366    
367     pthread_mutex_lock (&iolock);
368     ooffset = lseek (fd, 0, SEEK_CUR);
369     lseek (fd, offset, SEEK_SET);
370     res = write (fd, buf, count);
371     lseek (fd, offset, SEEK_SET);
372     pthread_mutex_unlock (&iolock);
373    
374     return res;
375     }
376     #endif
377    
378     #if !HAVE_FDATASYNC
379     # define fdatasync fsync
380     #endif
381    
382     #if !HAVE_READAHEAD
383     # define readahead aio_readahead
384    
385     static char readahead_buf[4096];
386    
387     static ssize_t
388     readahead (int fd, off_t offset, size_t count)
389     {
390     while (count > 0)
391     {
392     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
393    
394     pread (fd, readahead_buf, len, offset);
395     offset += len;
396     count -= len;
397     }
398    
399     errno = 0;
400     }
401     #endif
402    
403 root 1.22 /*****************************************************************************/
404    
405 root 1.1 static void *
406     aio_proc (void *thr_arg)
407     {
408     aio_req req;
409 root 1.3 int type;
410 root 1.1
411 root 1.3 do
412 root 1.1 {
413 root 1.3 pthread_mutex_lock (&reqlock);
414    
415     for (;;)
416     {
417     req = reqs;
418    
419     if (reqs)
420     {
421     reqs = reqs->next;
422     if (!reqs) reqe = 0;
423     }
424    
425     if (req)
426     break;
427    
428     pthread_cond_wait (&reqwait, &reqlock);
429     }
430    
431     pthread_mutex_unlock (&reqlock);
432    
433 root 1.1 errno = 0; /* strictly unnecessary */
434    
435 root 1.3 type = req->type;
436    
437     switch (type)
438 root 1.1 {
439 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
440     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
441 root 1.16
442 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
443    
444     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
445     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
446     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
447    
448     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
449     case REQ_CLOSE: req->result = close (req->fd); break;
450     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
451 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
452     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
453 root 1.1
454 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
455 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
456    
457     case REQ_QUIT:
458 root 1.3 break;
459 root 1.1
460     default:
461     req->result = ENOSYS;
462     break;
463     }
464    
465     req->errorno = errno;
466 root 1.3
467     pthread_mutex_lock (&reslock);
468    
469     req->next = 0;
470    
471     if (rese)
472     {
473     rese->next = req;
474     rese = req;
475     }
476     else
477     {
478     rese = ress = req;
479    
480     /* write a dummy byte to the pipe so fh becomes ready */
481     write (respipe [1], &respipe, 1);
482     }
483    
484     pthread_mutex_unlock (&reslock);
485 root 1.1 }
486 root 1.3 while (type != REQ_QUIT);
487 root 1.1
488     return 0;
489     }
490    
491 root 1.22 #define dREQ \
492     aio_req req; \
493     \
494     if (SvOK (callback) && !SvROK (callback)) \
495     croak ("clalback must be undef or of reference type"); \
496     \
497     Newz (0, req, 1, aio_cb); \
498     if (!req) \
499     croak ("out of memory during aio_req allocation"); \
500     \
501     req->callback = SvREFCNT_inc (callback);
502    
503 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
504    
505 root 1.8 PROTOTYPES: ENABLE
506    
507 root 1.1 BOOT:
508     {
509 root 1.26 create_pipe ();
510 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
511 root 1.1 }
512    
513     void
514     min_parallel(nthreads)
515     int nthreads
516     PROTOTYPE: $
517    
518     void
519     max_parallel(nthreads)
520     int nthreads
521     PROTOTYPE: $
522    
523 root 1.4 int
524     max_outstanding(nreqs)
525     int nreqs
526     PROTOTYPE: $
527     CODE:
528     RETVAL = max_outstanding;
529     max_outstanding = nreqs;
530    
531 root 1.1 void
532 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
533 root 1.1 SV * pathname
534     int flags
535     int mode
536     SV * callback
537 root 1.8 PROTOTYPE: $$$;$
538 root 1.1 CODE:
539     {
540 root 1.22 dREQ;
541 root 1.1
542     req->type = REQ_OPEN;
543     req->data = newSVsv (pathname);
544 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
545 root 1.1 req->fd = flags;
546     req->mode = mode;
547    
548     send_req (req);
549     }
550    
551     void
552 root 1.8 aio_close(fh,callback=&PL_sv_undef)
553 root 1.13 SV * fh
554     SV * callback
555 root 1.8 PROTOTYPE: $;$
556 root 1.1 ALIAS:
557     aio_close = REQ_CLOSE
558     aio_fsync = REQ_FSYNC
559     aio_fdatasync = REQ_FDATASYNC
560     CODE:
561     {
562 root 1.22 dREQ;
563 root 1.1
564     req->type = ix;
565 root 1.13 req->fh = newSVsv (fh);
566     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
567 root 1.1
568     send_req (req);
569     }
570    
571     void
572 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
573 root 1.13 SV * fh
574     UV offset
575     IV length
576     SV * data
577     IV dataoffset
578     SV * callback
579     ALIAS:
580     aio_read = REQ_READ
581     aio_write = REQ_WRITE
582 root 1.8 PROTOTYPE: $$$$$;$
583 root 1.1 CODE:
584 root 1.13 {
585     aio_req req;
586     STRLEN svlen;
587 root 1.21 char *svptr = SvPVbyte (data, svlen);
588 root 1.13
589     SvUPGRADE (data, SVt_PV);
590     SvPOK_on (data);
591 root 1.1
592 root 1.13 if (dataoffset < 0)
593     dataoffset += svlen;
594    
595     if (dataoffset < 0 || dataoffset > svlen)
596     croak ("data offset outside of string");
597    
598     if (ix == REQ_WRITE)
599     {
600     /* write: check length and adjust. */
601     if (length < 0 || length + dataoffset > svlen)
602     length = svlen - dataoffset;
603     }
604     else
605     {
606     /* read: grow scalar as necessary */
607     svptr = SvGROW (data, length + dataoffset);
608     }
609    
610     if (length < 0)
611     croak ("length must not be negative");
612    
613 root 1.22 {
614     dREQ;
615 root 1.13
616 root 1.22 req->type = ix;
617     req->fh = newSVsv (fh);
618     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
619     : IoOFP (sv_2io (fh)));
620     req->offset = offset;
621     req->length = length;
622     req->data = SvREFCNT_inc (data);
623     req->dataptr = (char *)svptr + dataoffset;
624     req->callback = SvREFCNT_inc (callback);
625 root 1.13
626 root 1.22 send_req (req);
627     }
628 root 1.13 }
629 root 1.1
630     void
631 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
632 root 1.13 SV * fh
633     UV offset
634     IV length
635     SV * callback
636 root 1.8 PROTOTYPE: $$$;$
637 root 1.1 CODE:
638     {
639 root 1.22 dREQ;
640 root 1.1
641     req->type = REQ_READAHEAD;
642 root 1.13 req->fh = newSVsv (fh);
643     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
644 root 1.1 req->offset = offset;
645     req->length = length;
646    
647     send_req (req);
648     }
649    
650     void
651 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
652 root 1.1 SV * fh_or_path
653     SV * callback
654     ALIAS:
655 root 1.8 aio_stat = REQ_STAT
656     aio_lstat = REQ_LSTAT
657 root 1.1 CODE:
658     {
659 root 1.22 dREQ;
660 root 1.1
661     New (0, req->statdata, 1, Stat_t);
662     if (!req->statdata)
663 root 1.22 croak ("out of memory during aio_req->statdata allocation (sorry, i just leaked memory, too)");
664 root 1.1
665     if (SvPOK (fh_or_path))
666     {
667 root 1.8 req->type = ix;
668 root 1.1 req->data = newSVsv (fh_or_path);
669 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
670 root 1.1 }
671     else
672     {
673     req->type = REQ_FSTAT;
674 root 1.13 req->fh = newSVsv (fh_or_path);
675 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
676     }
677    
678     send_req (req);
679     }
680    
681     void
682 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
683 root 1.1 SV * pathname
684     SV * callback
685 root 1.22 ALIAS:
686     aio_unlink = REQ_UNLINK
687     aio_rmdir = REQ_RMDIR
688 root 1.1 CODE:
689     {
690 root 1.22 dREQ;
691 root 1.1
692 root 1.22 req->type = ix;
693     req->data = newSVsv (pathname);
694     req->dataptr = SvPVbyte_nolen (req->data);
695 root 1.1
696 root 1.22 send_req (req);
697     }
698    
699     void
700     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
701     SV * oldpath
702     SV * newpath
703     SV * callback
704     CODE:
705     {
706     dREQ;
707 root 1.1
708 root 1.22 req->type = REQ_SYMLINK;
709     req->fh = newSVsv (oldpath);
710     req->data2ptr = SvPVbyte_nolen (req->fh);
711     req->data = newSVsv (newpath);
712     req->dataptr = SvPVbyte_nolen (req->data);
713 root 1.1
714     send_req (req);
715     }
716    
717 root 1.6 void
718     flush()
719     PROTOTYPE:
720     CODE:
721     while (nreqs)
722     {
723     poll_wait ();
724     poll_cb ();
725     }
726    
727 root 1.7 void
728     poll()
729     PROTOTYPE:
730     CODE:
731     if (nreqs)
732     {
733     poll_wait ();
734     poll_cb ();
735     }
736    
737 root 1.1 int
738     poll_fileno()
739     PROTOTYPE:
740     CODE:
741 root 1.3 RETVAL = respipe [0];
742 root 1.1 OUTPUT:
743     RETVAL
744    
745     int
746     poll_cb(...)
747     PROTOTYPE:
748     CODE:
749 root 1.5 RETVAL = poll_cb ();
750 root 1.1 OUTPUT:
751     RETVAL
752    
753     void
754     poll_wait()
755     PROTOTYPE:
756     CODE:
757 root 1.3 if (nreqs)
758     poll_wait ();
759 root 1.1
760     int
761     nreqs()
762     PROTOTYPE:
763     CODE:
764     RETVAL = nreqs;
765     OUTPUT:
766     RETVAL
767