ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.30
Committed: Thu Aug 18 16:32:10 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.29: +15 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.1 #include <sys/types.h>
11     #include <sys/stat.h>
12 root 1.10
13 root 1.1 #include <unistd.h>
14     #include <fcntl.h>
15     #include <signal.h>
16     #include <sched.h>
17    
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
36 root 1.23 REQ_UNLINK, REQ_RMDIR,
37 root 1.22 REQ_SYMLINK,
38 root 1.1 };
39    
40     typedef struct aio_cb {
41 root 1.3 struct aio_cb *volatile next;
42 root 1.1
43     int type;
44    
45     int fd;
46     off_t offset;
47     size_t length;
48     ssize_t result;
49     mode_t mode; /* open */
50     int errorno;
51 root 1.13 SV *data, *callback, *fh;
52 root 1.22 void *dataptr, *data2ptr;
53 root 1.1 STRLEN dataoffset;
54    
55     Stat_t *statdata;
56     } aio_cb;
57    
58     typedef aio_cb *aio_req;
59    
60 root 1.30 static int started, wanted;
61 root 1.11 static volatile int nreqs;
62 root 1.4 static int max_outstanding = 1<<30;
63 root 1.3 static int respipe [2];
64 root 1.1
65 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
68    
69     static volatile aio_req reqs, reqe; /* queue start, queue end */
70     static volatile aio_req ress, rese; /* queue start, queue end */
71 root 1.1
72 root 1.26 static void free_req (aio_req req)
73     {
74     if (req->data)
75     SvREFCNT_dec (req->data);
76    
77     if (req->fh)
78     SvREFCNT_dec (req->fh);
79    
80 root 1.27 if (req->statdata)
81 root 1.26 Safefree (req->statdata);
82    
83     if (req->callback)
84     SvREFCNT_dec (req->callback);
85    
86     Safefree (req);
87     }
88    
89 root 1.1 static void
90     poll_wait ()
91     {
92 root 1.11 if (nreqs && !ress)
93     {
94     fd_set rfd;
95     FD_ZERO(&rfd);
96     FD_SET(respipe [0], &rfd);
97 root 1.3
98 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
99     }
100 root 1.1 }
101    
102     static int
103 root 1.5 poll_cb ()
104 root 1.1 {
105     dSP;
106     int count = 0;
107 root 1.24 int do_croak = 0;
108 root 1.27 aio_req req;
109    
110     for (;;)
111     {
112     pthread_mutex_lock (&reslock);
113     req = ress;
114    
115     if (req)
116     {
117     ress = req->next;
118 root 1.11
119 root 1.27 if (!ress)
120     {
121     /* read any signals sent by the worker threads */
122     char buf [32];
123     while (read (respipe [0], buf, 32) == 32)
124     ;
125 root 1.30
126     rese = 0;
127 root 1.27 }
128     }
129 root 1.1
130 root 1.27 pthread_mutex_unlock (&reslock);
131 root 1.3
132 root 1.27 if (!req)
133     break;
134 root 1.3
135 root 1.1 nreqs--;
136    
137     if (req->type == REQ_QUIT)
138     started--;
139     else
140     {
141     int errorno = errno;
142     errno = req->errorno;
143    
144     if (req->type == REQ_READ)
145 root 1.27 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
146 root 1.1
147 root 1.28 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
148     SvREADONLY_off (req->data);
149    
150 root 1.27 if (req->statdata)
151 root 1.1 {
152     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
153     PL_laststatval = req->result;
154     PL_statcache = *(req->statdata);
155     }
156    
157 root 1.13 ENTER;
158 root 1.1 PUSHMARK (SP);
159     XPUSHs (sv_2mortal (newSViv (req->result)));
160 root 1.2
161     if (req->type == REQ_OPEN)
162     {
163     /* convert fd to fh */
164     SV *fh;
165    
166     PUTBACK;
167     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
168     SPAGAIN;
169    
170 root 1.13 fh = SvREFCNT_inc (POPs);
171 root 1.2
172     PUSHMARK (SP);
173 root 1.13 XPUSHs (sv_2mortal (fh));
174 root 1.2 }
175    
176 root 1.8 if (SvOK (req->callback))
177     {
178     PUTBACK;
179     call_sv (req->callback, G_VOID | G_EVAL);
180     SPAGAIN;
181 root 1.27
182     if (SvTRUE (ERRSV))
183     {
184     free_req (req);
185     croak (0);
186     }
187 root 1.8 }
188 root 1.13
189 root 1.26 LEAVE;
190    
191 root 1.1 errno = errorno;
192     count++;
193     }
194    
195 root 1.27 free_req (req);
196 root 1.1 }
197    
198     return count;
199     }
200    
201 root 1.4 static void *aio_proc(void *arg);
202    
203     static void
204     start_thread (void)
205     {
206     sigset_t fullsigset, oldsigset;
207     pthread_t tid;
208     pthread_attr_t attr;
209    
210     pthread_attr_init (&attr);
211     pthread_attr_setstacksize (&attr, STACKSIZE);
212 root 1.30 // pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
213 root 1.4
214     sigfillset (&fullsigset);
215     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
216    
217     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
218     started++;
219    
220     sigprocmask (SIG_SETMASK, &oldsigset, 0);
221     }
222    
223     static void
224     send_req (aio_req req)
225     {
226 root 1.30 while (started < wanted && nreqs >= started)
227     start_thread ();
228    
229 root 1.4 nreqs++;
230    
231     pthread_mutex_lock (&reqlock);
232    
233     req->next = 0;
234    
235     if (reqe)
236     {
237     reqe->next = req;
238     reqe = req;
239     }
240     else
241     reqe = reqs = req;
242    
243     pthread_cond_signal (&reqwait);
244     pthread_mutex_unlock (&reqlock);
245    
246 root 1.27 if (nreqs > max_outstanding)
247     for (;;)
248     {
249     poll_cb ();
250    
251     if (nreqs <= max_outstanding)
252     break;
253    
254     poll_wait ();
255     }
256 root 1.4 }
257    
258     static void
259     end_thread (void)
260     {
261     aio_req req;
262 root 1.26 Newz (0, req, 1, aio_cb);
263 root 1.4 req->type = REQ_QUIT;
264    
265     send_req (req);
266     }
267    
268 root 1.22 static void min_parallel (int nthreads)
269     {
270 root 1.30 if (wanted < nthreads)
271     wanted = nthreads;
272 root 1.22 }
273    
274     static void max_parallel (int nthreads)
275     {
276     int cur = started;
277 root 1.27
278 root 1.30 if (wanted > nthreads)
279     wanted = nthreads;
280    
281     while (cur > wanted)
282     {
283 root 1.22 end_thread ();
284     cur--;
285     }
286    
287 root 1.30 while (started > wanted)
288 root 1.22 {
289     poll_wait ();
290     poll_cb ();
291     }
292     }
293    
294 root 1.26 static void create_pipe ()
295 root 1.22 {
296 root 1.26 if (pipe (respipe))
297     croak ("unable to initialize result pipe");
298 root 1.22
299 root 1.26 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
300     croak ("cannot set result pipe to nonblocking mode");
301 root 1.22
302 root 1.26 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
303     croak ("cannot set result pipe to nonblocking mode");
304     }
305 root 1.22
306 root 1.26 static void atfork_prepare (void)
307     {
308 root 1.28 pthread_mutex_lock (&reqlock);
309 root 1.22 pthread_mutex_lock (&reslock);
310     }
311    
312     static void atfork_parent (void)
313     {
314     pthread_mutex_unlock (&reslock);
315 root 1.26 pthread_mutex_unlock (&reqlock);
316     }
317    
318     static void atfork_child (void)
319     {
320 root 1.28 aio_req prv;
321    
322 root 1.26 started = 0;
323    
324 root 1.28 while (reqs)
325     {
326     prv = reqs;
327     reqs = prv->next;
328     free_req (prv);
329     }
330    
331     reqs = reqe = 0;
332    
333     while (ress)
334     {
335     prv = ress;
336     ress = prv->next;
337     free_req (prv);
338     }
339    
340     ress = rese = 0;
341    
342 root 1.29 close (respipe [0]);
343     close (respipe [1]);
344     create_pipe ();
345    
346 root 1.26 atfork_parent ();
347 root 1.22 }
348    
349     /*****************************************************************************/
350 root 1.17 /* work around various missing functions */
351    
352     #if !HAVE_PREADWRITE
353     # define pread aio_pread
354     # define pwrite aio_pwrite
355    
356     /*
357     * make our pread/pwrite safe against themselves, but not against
358     * normal read/write by using a mutex. slows down execution a lot,
359     * but that's your problem, not mine.
360     */
361     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
362    
363     static ssize_t
364     pread (int fd, void *buf, size_t count, off_t offset)
365     {
366     ssize_t res;
367     off_t ooffset;
368    
369     pthread_mutex_lock (&iolock);
370     ooffset = lseek (fd, 0, SEEK_CUR);
371     lseek (fd, offset, SEEK_SET);
372     res = read (fd, buf, count);
373     lseek (fd, ooffset, SEEK_SET);
374     pthread_mutex_unlock (&iolock);
375    
376     return res;
377     }
378    
379     static ssize_t
380     pwrite (int fd, void *buf, size_t count, off_t offset)
381     {
382     ssize_t res;
383     off_t ooffset;
384    
385     pthread_mutex_lock (&iolock);
386     ooffset = lseek (fd, 0, SEEK_CUR);
387     lseek (fd, offset, SEEK_SET);
388     res = write (fd, buf, count);
389     lseek (fd, offset, SEEK_SET);
390     pthread_mutex_unlock (&iolock);
391    
392     return res;
393     }
394     #endif
395    
396     #if !HAVE_FDATASYNC
397     # define fdatasync fsync
398     #endif
399    
400     #if !HAVE_READAHEAD
401     # define readahead aio_readahead
402    
403     static char readahead_buf[4096];
404    
405     static ssize_t
406     readahead (int fd, off_t offset, size_t count)
407     {
408     while (count > 0)
409     {
410     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
411    
412     pread (fd, readahead_buf, len, offset);
413     offset += len;
414     count -= len;
415     }
416    
417     errno = 0;
418     }
419     #endif
420    
421 root 1.22 /*****************************************************************************/
422    
423 root 1.1 static void *
424     aio_proc (void *thr_arg)
425     {
426     aio_req req;
427 root 1.3 int type;
428 root 1.1
429 root 1.3 do
430 root 1.1 {
431 root 1.3 pthread_mutex_lock (&reqlock);
432    
433     for (;;)
434     {
435     req = reqs;
436    
437     if (reqs)
438     {
439     reqs = reqs->next;
440     if (!reqs) reqe = 0;
441     }
442    
443     if (req)
444     break;
445    
446     pthread_cond_wait (&reqwait, &reqlock);
447     }
448    
449     pthread_mutex_unlock (&reqlock);
450    
451 root 1.1 errno = 0; /* strictly unnecessary */
452    
453 root 1.3 type = req->type;
454    
455     switch (type)
456 root 1.1 {
457 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
458     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
459 root 1.16
460 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
461    
462     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
463     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
464     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
465    
466     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
467     case REQ_CLOSE: req->result = close (req->fd); break;
468     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
469 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
470     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
471 root 1.1
472 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
473 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
474    
475     case REQ_QUIT:
476 root 1.3 break;
477 root 1.1
478     default:
479     req->result = ENOSYS;
480     break;
481     }
482    
483     req->errorno = errno;
484 root 1.3
485     pthread_mutex_lock (&reslock);
486    
487     req->next = 0;
488    
489     if (rese)
490     {
491     rese->next = req;
492     rese = req;
493     }
494     else
495     {
496     rese = ress = req;
497    
498     /* write a dummy byte to the pipe so fh becomes ready */
499     write (respipe [1], &respipe, 1);
500     }
501    
502     pthread_mutex_unlock (&reslock);
503 root 1.1 }
504 root 1.3 while (type != REQ_QUIT);
505 root 1.1
506     return 0;
507     }
508    
509 root 1.22 #define dREQ \
510     aio_req req; \
511     \
512     if (SvOK (callback) && !SvROK (callback)) \
513     croak ("clalback must be undef or of reference type"); \
514     \
515     Newz (0, req, 1, aio_cb); \
516     if (!req) \
517     croak ("out of memory during aio_req allocation"); \
518     \
519 root 1.27 req->callback = newSVsv (callback);
520 root 1.22
521 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
522    
523 root 1.8 PROTOTYPES: ENABLE
524    
525 root 1.1 BOOT:
526     {
527 root 1.26 create_pipe ();
528 root 1.22 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
529 root 1.1 }
530    
531     void
532     min_parallel(nthreads)
533     int nthreads
534     PROTOTYPE: $
535    
536     void
537     max_parallel(nthreads)
538     int nthreads
539     PROTOTYPE: $
540    
541 root 1.4 int
542     max_outstanding(nreqs)
543     int nreqs
544     PROTOTYPE: $
545     CODE:
546     RETVAL = max_outstanding;
547     max_outstanding = nreqs;
548    
549 root 1.1 void
550 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
551 root 1.1 SV * pathname
552     int flags
553     int mode
554     SV * callback
555 root 1.8 PROTOTYPE: $$$;$
556 root 1.1 CODE:
557     {
558 root 1.22 dREQ;
559 root 1.1
560     req->type = REQ_OPEN;
561     req->data = newSVsv (pathname);
562 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
563 root 1.1 req->fd = flags;
564     req->mode = mode;
565    
566     send_req (req);
567     }
568    
569     void
570 root 1.8 aio_close(fh,callback=&PL_sv_undef)
571 root 1.13 SV * fh
572     SV * callback
573 root 1.8 PROTOTYPE: $;$
574 root 1.1 ALIAS:
575     aio_close = REQ_CLOSE
576     aio_fsync = REQ_FSYNC
577     aio_fdatasync = REQ_FDATASYNC
578     CODE:
579     {
580 root 1.22 dREQ;
581 root 1.1
582     req->type = ix;
583 root 1.13 req->fh = newSVsv (fh);
584     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
585 root 1.1
586     send_req (req);
587     }
588    
589     void
590 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
591 root 1.13 SV * fh
592     UV offset
593     IV length
594     SV * data
595     IV dataoffset
596     SV * callback
597     ALIAS:
598     aio_read = REQ_READ
599     aio_write = REQ_WRITE
600 root 1.8 PROTOTYPE: $$$$$;$
601 root 1.1 CODE:
602 root 1.13 {
603     aio_req req;
604     STRLEN svlen;
605 root 1.21 char *svptr = SvPVbyte (data, svlen);
606 root 1.13
607     SvUPGRADE (data, SVt_PV);
608     SvPOK_on (data);
609 root 1.1
610 root 1.13 if (dataoffset < 0)
611     dataoffset += svlen;
612    
613     if (dataoffset < 0 || dataoffset > svlen)
614     croak ("data offset outside of string");
615    
616     if (ix == REQ_WRITE)
617     {
618     /* write: check length and adjust. */
619     if (length < 0 || length + dataoffset > svlen)
620     length = svlen - dataoffset;
621     }
622     else
623     {
624     /* read: grow scalar as necessary */
625     svptr = SvGROW (data, length + dataoffset);
626     }
627    
628     if (length < 0)
629     croak ("length must not be negative");
630    
631 root 1.22 {
632     dREQ;
633 root 1.13
634 root 1.22 req->type = ix;
635     req->fh = newSVsv (fh);
636     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
637     : IoOFP (sv_2io (fh)));
638     req->offset = offset;
639     req->length = length;
640     req->data = SvREFCNT_inc (data);
641     req->dataptr = (char *)svptr + dataoffset;
642 root 1.13
643 root 1.28 if (!SvREADONLY (data))
644     {
645     SvREADONLY_on (data);
646     req->data2ptr = (void *)data;
647     }
648    
649 root 1.22 send_req (req);
650     }
651 root 1.13 }
652 root 1.1
653     void
654 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
655 root 1.13 SV * fh
656     UV offset
657     IV length
658     SV * callback
659 root 1.8 PROTOTYPE: $$$;$
660 root 1.1 CODE:
661     {
662 root 1.22 dREQ;
663 root 1.1
664     req->type = REQ_READAHEAD;
665 root 1.13 req->fh = newSVsv (fh);
666     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
667 root 1.1 req->offset = offset;
668     req->length = length;
669    
670     send_req (req);
671     }
672    
673     void
674 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
675 root 1.1 SV * fh_or_path
676     SV * callback
677     ALIAS:
678 root 1.8 aio_stat = REQ_STAT
679     aio_lstat = REQ_LSTAT
680 root 1.1 CODE:
681     {
682 root 1.22 dREQ;
683 root 1.1
684     New (0, req->statdata, 1, Stat_t);
685     if (!req->statdata)
686 root 1.27 {
687     free_req (req);
688     croak ("out of memory during aio_req->statdata allocation");
689     }
690 root 1.1
691     if (SvPOK (fh_or_path))
692     {
693 root 1.8 req->type = ix;
694 root 1.1 req->data = newSVsv (fh_or_path);
695 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
696 root 1.1 }
697     else
698     {
699     req->type = REQ_FSTAT;
700 root 1.13 req->fh = newSVsv (fh_or_path);
701 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
702     }
703    
704     send_req (req);
705     }
706    
707     void
708 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
709 root 1.1 SV * pathname
710     SV * callback
711 root 1.22 ALIAS:
712     aio_unlink = REQ_UNLINK
713     aio_rmdir = REQ_RMDIR
714 root 1.1 CODE:
715     {
716 root 1.22 dREQ;
717 root 1.1
718 root 1.22 req->type = ix;
719     req->data = newSVsv (pathname);
720     req->dataptr = SvPVbyte_nolen (req->data);
721 root 1.1
722 root 1.22 send_req (req);
723     }
724    
725     void
726     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
727     SV * oldpath
728     SV * newpath
729     SV * callback
730     CODE:
731     {
732     dREQ;
733 root 1.1
734 root 1.22 req->type = REQ_SYMLINK;
735     req->fh = newSVsv (oldpath);
736     req->data2ptr = SvPVbyte_nolen (req->fh);
737     req->data = newSVsv (newpath);
738     req->dataptr = SvPVbyte_nolen (req->data);
739 root 1.1
740     send_req (req);
741     }
742    
743 root 1.6 void
744     flush()
745     PROTOTYPE:
746     CODE:
747     while (nreqs)
748     {
749     poll_wait ();
750     poll_cb ();
751     }
752    
753 root 1.7 void
754     poll()
755     PROTOTYPE:
756     CODE:
757     if (nreqs)
758     {
759     poll_wait ();
760     poll_cb ();
761     }
762    
763 root 1.1 int
764     poll_fileno()
765     PROTOTYPE:
766     CODE:
767 root 1.3 RETVAL = respipe [0];
768 root 1.1 OUTPUT:
769     RETVAL
770    
771     int
772     poll_cb(...)
773     PROTOTYPE:
774     CODE:
775 root 1.5 RETVAL = poll_cb ();
776 root 1.1 OUTPUT:
777     RETVAL
778    
779     void
780     poll_wait()
781     PROTOTYPE:
782     CODE:
783 root 1.3 if (nreqs)
784     poll_wait ();
785 root 1.1
786     int
787     nreqs()
788     PROTOTYPE:
789     CODE:
790     RETVAL = nreqs;
791     OUTPUT:
792     RETVAL
793