ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.23
Committed: Tue Aug 16 23:33:34 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_2
Changes since 1.22: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.20 #define _REENTRANT 1
2 root 1.19 #include <errno.h>
3    
4 root 1.15 #include "EXTERN.h"
5 root 1.1 #include "perl.h"
6     #include "XSUB.h"
7    
8 root 1.16 #include "autoconf/config.h"
9    
10 root 1.1 #include <sys/types.h>
11     #include <sys/stat.h>
12 root 1.10
13 root 1.1 #include <unistd.h>
14     #include <fcntl.h>
15     #include <signal.h>
16     #include <sched.h>
17    
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34 root 1.22 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
35 root 1.1 REQ_FSYNC, REQ_FDATASYNC,
36 root 1.23 REQ_UNLINK, REQ_RMDIR,
37 root 1.22 REQ_SYMLINK,
38 root 1.1 };
39    
40     typedef struct aio_cb {
41 root 1.3 struct aio_cb *volatile next;
42 root 1.1
43     int type;
44    
45     int fd;
46     off_t offset;
47     size_t length;
48     ssize_t result;
49     mode_t mode; /* open */
50     int errorno;
51 root 1.13 SV *data, *callback, *fh;
52 root 1.22 void *dataptr, *data2ptr;
53 root 1.1 STRLEN dataoffset;
54    
55     Stat_t *statdata;
56     } aio_cb;
57    
58     typedef aio_cb *aio_req;
59    
60     static int started;
61 root 1.11 static volatile int nreqs;
62 root 1.4 static int max_outstanding = 1<<30;
63 root 1.3 static int respipe [2];
64 root 1.1
65 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67 root 1.22 static pthread_mutex_t frklock = PTHREAD_MUTEX_INITIALIZER;
68 root 1.3 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
69    
70     static volatile aio_req reqs, reqe; /* queue start, queue end */
71     static volatile aio_req ress, rese; /* queue start, queue end */
72 root 1.1
73     static void
74     poll_wait ()
75     {
76 root 1.11 if (nreqs && !ress)
77     {
78     fd_set rfd;
79     FD_ZERO(&rfd);
80     FD_SET(respipe [0], &rfd);
81 root 1.3
82 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
83     }
84 root 1.1 }
85    
86     static int
87 root 1.5 poll_cb ()
88 root 1.1 {
89     dSP;
90     int count = 0;
91 root 1.11 aio_req req, prv;
92    
93     pthread_mutex_lock (&reslock);
94    
95 root 1.3 {
96 root 1.11 /* read any signals sent by the worker threads */
97 root 1.3 char buf [32];
98 root 1.22 while (read (respipe [0], buf, 32) == 32)
99 root 1.3 ;
100     }
101 root 1.1
102 root 1.11 req = ress;
103     ress = rese = 0;
104 root 1.3
105 root 1.11 pthread_mutex_unlock (&reslock);
106 root 1.3
107 root 1.11 while (req)
108     {
109 root 1.1 nreqs--;
110    
111     if (req->type == REQ_QUIT)
112     started--;
113     else
114     {
115     int errorno = errno;
116     errno = req->errorno;
117    
118     if (req->type == REQ_READ)
119     SvCUR_set (req->data, req->dataoffset
120     + req->result > 0 ? req->result : 0);
121    
122     if (req->data)
123     SvREFCNT_dec (req->data);
124    
125 root 1.13 if (req->fh)
126     SvREFCNT_dec (req->fh);
127    
128 root 1.1 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
129     {
130     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
131     PL_laststatval = req->result;
132     PL_statcache = *(req->statdata);
133    
134     Safefree (req->statdata);
135     }
136    
137 root 1.13 ENTER;
138 root 1.1 PUSHMARK (SP);
139     XPUSHs (sv_2mortal (newSViv (req->result)));
140 root 1.2
141     if (req->type == REQ_OPEN)
142     {
143     /* convert fd to fh */
144     SV *fh;
145    
146     PUTBACK;
147     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
148     SPAGAIN;
149    
150 root 1.13 fh = SvREFCNT_inc (POPs);
151 root 1.2
152     PUSHMARK (SP);
153 root 1.13 XPUSHs (sv_2mortal (fh));
154 root 1.2 }
155    
156 root 1.8 if (SvOK (req->callback))
157     {
158     PUTBACK;
159     call_sv (req->callback, G_VOID | G_EVAL);
160     SPAGAIN;
161     }
162 root 1.13
163     LEAVE;
164 root 1.1
165     if (req->callback)
166     SvREFCNT_dec (req->callback);
167    
168     errno = errorno;
169     count++;
170     }
171    
172 root 1.11 prv = req;
173     req = req->next;
174     Safefree (prv);
175    
176     /* TODO: croak on errors? */
177 root 1.1 }
178    
179     return count;
180     }
181    
182 root 1.4 static void *aio_proc(void *arg);
183    
184     static void
185     start_thread (void)
186     {
187     sigset_t fullsigset, oldsigset;
188     pthread_t tid;
189     pthread_attr_t attr;
190    
191     pthread_attr_init (&attr);
192     pthread_attr_setstacksize (&attr, STACKSIZE);
193     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
194    
195     sigfillset (&fullsigset);
196     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
197    
198     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
199     started++;
200    
201     sigprocmask (SIG_SETMASK, &oldsigset, 0);
202     }
203    
204     static void
205     send_req (aio_req req)
206     {
207     nreqs++;
208    
209     pthread_mutex_lock (&reqlock);
210    
211     req->next = 0;
212    
213     if (reqe)
214     {
215     reqe->next = req;
216     reqe = req;
217     }
218     else
219     reqe = reqs = req;
220    
221     pthread_cond_signal (&reqwait);
222     pthread_mutex_unlock (&reqlock);
223    
224     while (nreqs > max_outstanding)
225     {
226     poll_wait ();
227     poll_cb ();
228     }
229     }
230    
231     static void
232     end_thread (void)
233     {
234     aio_req req;
235     New (0, req, 1, aio_cb);
236     req->type = REQ_QUIT;
237    
238     send_req (req);
239     }
240    
241 root 1.22
242     static void min_parallel (int nthreads)
243     {
244     while (nthreads > started)
245     start_thread ();
246     }
247    
248     static void max_parallel (int nthreads)
249     {
250     int cur = started;
251     while (cur > nthreads)
252     {
253     end_thread ();
254     cur--;
255     }
256    
257     while (started > nthreads)
258     {
259     poll_wait ();
260     poll_cb ();
261     }
262     }
263    
264     static int fork_started;
265    
266     static void atfork_prepare (void)
267     {
268     pthread_mutex_lock (&frklock);
269    
270     fork_started = started;
271    
272     for (;;) {
273     while (nreqs)
274     {
275     poll_wait ();
276     poll_cb ();
277     }
278    
279     max_parallel (0);
280    
281     pthread_mutex_lock (&reqlock);
282    
283     if (!nreqs && !started)
284     break;
285    
286     pthread_mutex_unlock (&reqlock);
287    
288     min_parallel (fork_started);
289     }
290    
291     pthread_mutex_lock (&reslock);
292    
293     assert (!started);
294     assert (!nreqs);
295     assert (!reqs && !reqe);
296     assert (!ress && !rese);
297     }
298    
299     static void atfork_parent (void)
300     {
301     pthread_mutex_unlock (&reslock);
302     min_parallel (fork_started);
303     pthread_mutex_unlock (&reqlock);
304     pthread_mutex_unlock (&frklock);
305     }
306    
307     static void atfork_child (void)
308     {
309     reqs = reqe = 0;
310    
311     atfork_parent ();
312     }
313    
314     /*****************************************************************************/
315 root 1.17 /* work around various missing functions */
316    
317     #if !HAVE_PREADWRITE
318     # define pread aio_pread
319     # define pwrite aio_pwrite
320    
321     /*
322     * make our pread/pwrite safe against themselves, but not against
323     * normal read/write by using a mutex. slows down execution a lot,
324     * but that's your problem, not mine.
325     */
326     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
327    
328     static ssize_t
329     pread (int fd, void *buf, size_t count, off_t offset)
330     {
331     ssize_t res;
332     off_t ooffset;
333    
334     pthread_mutex_lock (&iolock);
335     ooffset = lseek (fd, 0, SEEK_CUR);
336     lseek (fd, offset, SEEK_SET);
337     res = read (fd, buf, count);
338     lseek (fd, ooffset, SEEK_SET);
339     pthread_mutex_unlock (&iolock);
340    
341     return res;
342     }
343    
344     static ssize_t
345     pwrite (int fd, void *buf, size_t count, off_t offset)
346     {
347     ssize_t res;
348     off_t ooffset;
349    
350     pthread_mutex_lock (&iolock);
351     ooffset = lseek (fd, 0, SEEK_CUR);
352     lseek (fd, offset, SEEK_SET);
353     res = write (fd, buf, count);
354     lseek (fd, offset, SEEK_SET);
355     pthread_mutex_unlock (&iolock);
356    
357     return res;
358     }
359     #endif
360    
361     #if !HAVE_FDATASYNC
362     # define fdatasync fsync
363     #endif
364    
365     #if !HAVE_READAHEAD
366     # define readahead aio_readahead
367    
368     static char readahead_buf[4096];
369    
370     static ssize_t
371     readahead (int fd, off_t offset, size_t count)
372     {
373     while (count > 0)
374     {
375     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
376    
377     pread (fd, readahead_buf, len, offset);
378     offset += len;
379     count -= len;
380     }
381    
382     errno = 0;
383     }
384     #endif
385    
386 root 1.22 /*****************************************************************************/
387    
388 root 1.1 static void *
389     aio_proc (void *thr_arg)
390     {
391     aio_req req;
392 root 1.3 int type;
393 root 1.1
394 root 1.3 do
395 root 1.1 {
396 root 1.3 pthread_mutex_lock (&reqlock);
397    
398     for (;;)
399     {
400     req = reqs;
401    
402     if (reqs)
403     {
404     reqs = reqs->next;
405     if (!reqs) reqe = 0;
406     }
407    
408     if (req)
409     break;
410    
411     pthread_cond_wait (&reqwait, &reqlock);
412     }
413    
414     pthread_mutex_unlock (&reqlock);
415    
416 root 1.1 errno = 0; /* strictly unnecessary */
417    
418 root 1.3 type = req->type;
419    
420     switch (type)
421 root 1.1 {
422 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
423     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
424 root 1.16
425 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
426    
427     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
428     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
429     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
430    
431     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
432     case REQ_CLOSE: req->result = close (req->fd); break;
433     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
434 root 1.22 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
435     case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
436 root 1.1
437 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
438 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
439    
440     case REQ_QUIT:
441 root 1.3 break;
442 root 1.1
443     default:
444     req->result = ENOSYS;
445     break;
446     }
447    
448     req->errorno = errno;
449 root 1.3
450     pthread_mutex_lock (&reslock);
451    
452     req->next = 0;
453    
454     if (rese)
455     {
456     rese->next = req;
457     rese = req;
458     }
459     else
460     {
461     rese = ress = req;
462    
463     /* write a dummy byte to the pipe so fh becomes ready */
464     write (respipe [1], &respipe, 1);
465     }
466    
467     pthread_mutex_unlock (&reslock);
468 root 1.1 }
469 root 1.3 while (type != REQ_QUIT);
470 root 1.1
471     return 0;
472     }
473    
474 root 1.22 #define dREQ \
475     aio_req req; \
476     \
477     if (SvOK (callback) && !SvROK (callback)) \
478     croak ("clalback must be undef or of reference type"); \
479     \
480     Newz (0, req, 1, aio_cb); \
481     if (!req) \
482     croak ("out of memory during aio_req allocation"); \
483     \
484     req->callback = SvREFCNT_inc (callback);
485    
486 root 1.1 MODULE = IO::AIO PACKAGE = IO::AIO
487    
488 root 1.8 PROTOTYPES: ENABLE
489    
490 root 1.1 BOOT:
491     {
492 root 1.3 if (pipe (respipe))
493     croak ("unable to initialize result pipe");
494 root 1.1
495 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
496 root 1.1 croak ("cannot set result pipe to nonblocking mode");
497    
498 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
499 root 1.1 croak ("cannot set result pipe to nonblocking mode");
500 root 1.22
501     pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
502 root 1.1 }
503    
504     void
505     min_parallel(nthreads)
506     int nthreads
507     PROTOTYPE: $
508    
509     void
510     max_parallel(nthreads)
511     int nthreads
512     PROTOTYPE: $
513    
514 root 1.4 int
515     max_outstanding(nreqs)
516     int nreqs
517     PROTOTYPE: $
518     CODE:
519     RETVAL = max_outstanding;
520     max_outstanding = nreqs;
521    
522 root 1.1 void
523 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
524 root 1.1 SV * pathname
525     int flags
526     int mode
527     SV * callback
528 root 1.8 PROTOTYPE: $$$;$
529 root 1.1 CODE:
530     {
531 root 1.22 dREQ;
532 root 1.1
533     req->type = REQ_OPEN;
534     req->data = newSVsv (pathname);
535 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
536 root 1.1 req->fd = flags;
537     req->mode = mode;
538    
539     send_req (req);
540     }
541    
542     void
543 root 1.8 aio_close(fh,callback=&PL_sv_undef)
544 root 1.13 SV * fh
545     SV * callback
546 root 1.8 PROTOTYPE: $;$
547 root 1.1 ALIAS:
548     aio_close = REQ_CLOSE
549     aio_fsync = REQ_FSYNC
550     aio_fdatasync = REQ_FDATASYNC
551     CODE:
552     {
553 root 1.22 dREQ;
554 root 1.1
555     req->type = ix;
556 root 1.13 req->fh = newSVsv (fh);
557     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
558 root 1.1
559     send_req (req);
560     }
561    
562     void
563 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
564 root 1.13 SV * fh
565     UV offset
566     IV length
567     SV * data
568     IV dataoffset
569     SV * callback
570     ALIAS:
571     aio_read = REQ_READ
572     aio_write = REQ_WRITE
573 root 1.8 PROTOTYPE: $$$$$;$
574 root 1.1 CODE:
575 root 1.13 {
576     aio_req req;
577     STRLEN svlen;
578 root 1.21 char *svptr = SvPVbyte (data, svlen);
579 root 1.13
580     SvUPGRADE (data, SVt_PV);
581     SvPOK_on (data);
582 root 1.1
583 root 1.13 if (dataoffset < 0)
584     dataoffset += svlen;
585    
586     if (dataoffset < 0 || dataoffset > svlen)
587     croak ("data offset outside of string");
588    
589     if (ix == REQ_WRITE)
590     {
591     /* write: check length and adjust. */
592     if (length < 0 || length + dataoffset > svlen)
593     length = svlen - dataoffset;
594     }
595     else
596     {
597     /* read: grow scalar as necessary */
598     svptr = SvGROW (data, length + dataoffset);
599     }
600    
601     if (length < 0)
602     croak ("length must not be negative");
603    
604 root 1.22 {
605     dREQ;
606 root 1.13
607 root 1.22 req->type = ix;
608     req->fh = newSVsv (fh);
609     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
610     : IoOFP (sv_2io (fh)));
611     req->offset = offset;
612     req->length = length;
613     req->data = SvREFCNT_inc (data);
614     req->dataptr = (char *)svptr + dataoffset;
615     req->callback = SvREFCNT_inc (callback);
616 root 1.13
617 root 1.22 send_req (req);
618     }
619 root 1.13 }
620 root 1.1
621     void
622 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
623 root 1.13 SV * fh
624     UV offset
625     IV length
626     SV * callback
627 root 1.8 PROTOTYPE: $$$;$
628 root 1.1 CODE:
629     {
630 root 1.22 dREQ;
631 root 1.1
632     req->type = REQ_READAHEAD;
633 root 1.13 req->fh = newSVsv (fh);
634     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
635 root 1.1 req->offset = offset;
636     req->length = length;
637    
638     send_req (req);
639     }
640    
641     void
642 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
643 root 1.1 SV * fh_or_path
644     SV * callback
645     ALIAS:
646 root 1.8 aio_stat = REQ_STAT
647     aio_lstat = REQ_LSTAT
648 root 1.1 CODE:
649     {
650 root 1.22 dREQ;
651 root 1.1
652     New (0, req->statdata, 1, Stat_t);
653     if (!req->statdata)
654 root 1.22 croak ("out of memory during aio_req->statdata allocation (sorry, i just leaked memory, too)");
655 root 1.1
656     if (SvPOK (fh_or_path))
657     {
658 root 1.8 req->type = ix;
659 root 1.1 req->data = newSVsv (fh_or_path);
660 root 1.22 req->dataptr = SvPVbyte_nolen (req->data);
661 root 1.1 }
662     else
663     {
664     req->type = REQ_FSTAT;
665 root 1.13 req->fh = newSVsv (fh_or_path);
666 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
667     }
668    
669     send_req (req);
670     }
671    
672     void
673 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
674 root 1.1 SV * pathname
675     SV * callback
676 root 1.22 ALIAS:
677     aio_unlink = REQ_UNLINK
678     aio_rmdir = REQ_RMDIR
679 root 1.1 CODE:
680     {
681 root 1.22 dREQ;
682 root 1.1
683 root 1.22 req->type = ix;
684     req->data = newSVsv (pathname);
685     req->dataptr = SvPVbyte_nolen (req->data);
686 root 1.1
687 root 1.22 send_req (req);
688     }
689    
690     void
691     aio_symlink(oldpath,newpath,callback=&PL_sv_undef)
692     SV * oldpath
693     SV * newpath
694     SV * callback
695     CODE:
696     {
697     dREQ;
698 root 1.1
699 root 1.22 req->type = REQ_SYMLINK;
700     req->fh = newSVsv (oldpath);
701     req->data2ptr = SvPVbyte_nolen (req->fh);
702     req->data = newSVsv (newpath);
703     req->dataptr = SvPVbyte_nolen (req->data);
704 root 1.1
705     send_req (req);
706     }
707    
708 root 1.6 void
709     flush()
710     PROTOTYPE:
711     CODE:
712     while (nreqs)
713     {
714     poll_wait ();
715     poll_cb ();
716     }
717    
718 root 1.7 void
719     poll()
720     PROTOTYPE:
721     CODE:
722     if (nreqs)
723     {
724     poll_wait ();
725     poll_cb ();
726     }
727    
728 root 1.1 int
729     poll_fileno()
730     PROTOTYPE:
731     CODE:
732 root 1.3 RETVAL = respipe [0];
733 root 1.1 OUTPUT:
734     RETVAL
735    
736     int
737     poll_cb(...)
738     PROTOTYPE:
739     CODE:
740 root 1.5 RETVAL = poll_cb ();
741 root 1.1 OUTPUT:
742     RETVAL
743    
744     void
745     poll_wait()
746     PROTOTYPE:
747     CODE:
748 root 1.3 if (nreqs)
749     poll_wait ();
750 root 1.1
751     int
752     nreqs()
753     PROTOTYPE:
754     CODE:
755     RETVAL = nreqs;
756     OUTPUT:
757     RETVAL
758