ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.18
Committed: Sun Jul 31 19:00:31 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.17: +5 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.15 #include "EXTERN.h"
2 root 1.1 #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.16 #include "autoconf/config.h"
6    
7 root 1.1 #include <sys/types.h>
8     #include <sys/stat.h>
9 root 1.10
10 root 1.1 #include <unistd.h>
11     #include <fcntl.h>
12     #include <signal.h>
13     #include <sched.h>
14    
15 root 1.18 #ifndef _REENTRANT
16     # define _REENTRANT 1
17     #endif
18     #include <errno.h>
19    
20 root 1.1 #include <pthread.h>
21    
22     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
23     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
24     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
25    
26 root 1.4 #if __ia64
27     # define STACKSIZE 65536
28 root 1.1 #else
29 root 1.4 # define STACKSIZE 4096
30 root 1.1 #endif
31    
32     enum {
33     REQ_QUIT,
34     REQ_OPEN, REQ_CLOSE,
35     REQ_READ, REQ_WRITE, REQ_READAHEAD,
36     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
37     REQ_FSYNC, REQ_FDATASYNC,
38     };
39    
40     typedef struct aio_cb {
41 root 1.3 struct aio_cb *volatile next;
42 root 1.1
43     int type;
44    
45     int fd;
46     off_t offset;
47     size_t length;
48     ssize_t result;
49     mode_t mode; /* open */
50     int errorno;
51 root 1.13 SV *data, *callback, *fh;
52 root 1.1 void *dataptr;
53     STRLEN dataoffset;
54    
55     Stat_t *statdata;
56     } aio_cb;
57    
58     typedef aio_cb *aio_req;
59    
60     static int started;
61 root 1.11 static volatile int nreqs;
62 root 1.4 static int max_outstanding = 1<<30;
63 root 1.3 static int respipe [2];
64 root 1.1
65 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
66     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
67     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
68    
69     static volatile aio_req reqs, reqe; /* queue start, queue end */
70     static volatile aio_req ress, rese; /* queue start, queue end */
71 root 1.1
72     static void
73     poll_wait ()
74     {
75 root 1.11 if (nreqs && !ress)
76     {
77     fd_set rfd;
78     FD_ZERO(&rfd);
79     FD_SET(respipe [0], &rfd);
80 root 1.3
81 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
82     }
83 root 1.1 }
84    
85     static int
86 root 1.5 poll_cb ()
87 root 1.1 {
88     dSP;
89     int count = 0;
90 root 1.11 aio_req req, prv;
91    
92     pthread_mutex_lock (&reslock);
93    
94 root 1.3 {
95 root 1.11 /* read any signals sent by the worker threads */
96 root 1.3 char buf [32];
97     while (read (respipe [0], buf, 32) > 0)
98     ;
99     }
100 root 1.1
101 root 1.11 req = ress;
102     ress = rese = 0;
103 root 1.3
104 root 1.11 pthread_mutex_unlock (&reslock);
105 root 1.3
106 root 1.11 while (req)
107     {
108 root 1.1 nreqs--;
109    
110     if (req->type == REQ_QUIT)
111     started--;
112     else
113     {
114     int errorno = errno;
115     errno = req->errorno;
116    
117     if (req->type == REQ_READ)
118     SvCUR_set (req->data, req->dataoffset
119     + req->result > 0 ? req->result : 0);
120    
121     if (req->data)
122     SvREFCNT_dec (req->data);
123    
124 root 1.13 if (req->fh)
125     SvREFCNT_dec (req->fh);
126    
127 root 1.1 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
128     {
129     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
130     PL_laststatval = req->result;
131     PL_statcache = *(req->statdata);
132    
133     Safefree (req->statdata);
134     }
135    
136 root 1.13 ENTER;
137 root 1.1 PUSHMARK (SP);
138     XPUSHs (sv_2mortal (newSViv (req->result)));
139 root 1.2
140     if (req->type == REQ_OPEN)
141     {
142     /* convert fd to fh */
143     SV *fh;
144    
145     PUTBACK;
146     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
147     SPAGAIN;
148    
149 root 1.13 fh = SvREFCNT_inc (POPs);
150 root 1.2
151     PUSHMARK (SP);
152 root 1.13 XPUSHs (sv_2mortal (fh));
153 root 1.2 }
154    
155 root 1.8 if (SvOK (req->callback))
156     {
157     PUTBACK;
158     call_sv (req->callback, G_VOID | G_EVAL);
159     SPAGAIN;
160     }
161 root 1.13
162     LEAVE;
163 root 1.1
164     if (req->callback)
165     SvREFCNT_dec (req->callback);
166    
167     errno = errorno;
168     count++;
169     }
170    
171 root 1.11 prv = req;
172     req = req->next;
173     Safefree (prv);
174    
175     /* TODO: croak on errors? */
176 root 1.1 }
177    
178     return count;
179     }
180    
181 root 1.4 static void *aio_proc(void *arg);
182    
183     static void
184     start_thread (void)
185     {
186     sigset_t fullsigset, oldsigset;
187     pthread_t tid;
188     pthread_attr_t attr;
189    
190     pthread_attr_init (&attr);
191     pthread_attr_setstacksize (&attr, STACKSIZE);
192     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
193    
194     sigfillset (&fullsigset);
195     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
196    
197     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
198     started++;
199    
200     sigprocmask (SIG_SETMASK, &oldsigset, 0);
201     }
202    
203     static void
204     send_req (aio_req req)
205     {
206     nreqs++;
207    
208     pthread_mutex_lock (&reqlock);
209    
210     req->next = 0;
211    
212     if (reqe)
213     {
214     reqe->next = req;
215     reqe = req;
216     }
217     else
218     reqe = reqs = req;
219    
220     pthread_cond_signal (&reqwait);
221     pthread_mutex_unlock (&reqlock);
222    
223     while (nreqs > max_outstanding)
224     {
225     poll_wait ();
226     poll_cb ();
227     }
228     }
229    
230     static void
231     end_thread (void)
232     {
233     aio_req req;
234     New (0, req, 1, aio_cb);
235     req->type = REQ_QUIT;
236    
237     send_req (req);
238     }
239    
240 root 1.17 /* work around various missing functions */
241    
242     #if !HAVE_PREADWRITE
243     # define pread aio_pread
244     # define pwrite aio_pwrite
245    
246     /*
247     * make our pread/pwrite safe against themselves, but not against
248     * normal read/write by using a mutex. slows down execution a lot,
249     * but that's your problem, not mine.
250     */
251     static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER;
252    
253     static ssize_t
254     pread (int fd, void *buf, size_t count, off_t offset)
255     {
256     ssize_t res;
257     off_t ooffset;
258    
259     pthread_mutex_lock (&iolock);
260     ooffset = lseek (fd, 0, SEEK_CUR);
261     lseek (fd, offset, SEEK_SET);
262     res = read (fd, buf, count);
263     lseek (fd, ooffset, SEEK_SET);
264     pthread_mutex_unlock (&iolock);
265    
266     return res;
267     }
268    
269     static ssize_t
270     pwrite (int fd, void *buf, size_t count, off_t offset)
271     {
272     ssize_t res;
273     off_t ooffset;
274    
275     pthread_mutex_lock (&iolock);
276     ooffset = lseek (fd, 0, SEEK_CUR);
277     lseek (fd, offset, SEEK_SET);
278     res = write (fd, buf, count);
279     lseek (fd, offset, SEEK_SET);
280     pthread_mutex_unlock (&iolock);
281    
282     return res;
283     }
284     #endif
285    
286     #if !HAVE_FDATASYNC
287     # define fdatasync fsync
288     #endif
289    
290     #if !HAVE_READAHEAD
291     # define readahead aio_readahead
292    
293     static char readahead_buf[4096];
294    
295     static ssize_t
296     readahead (int fd, off_t offset, size_t count)
297     {
298     while (count > 0)
299     {
300     size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf);
301    
302     pread (fd, readahead_buf, len, offset);
303     offset += len;
304     count -= len;
305     }
306    
307     errno = 0;
308     }
309     #endif
310    
311 root 1.1 static void *
312     aio_proc (void *thr_arg)
313     {
314     aio_req req;
315 root 1.3 int type;
316 root 1.1
317 root 1.3 do
318 root 1.1 {
319 root 1.3 pthread_mutex_lock (&reqlock);
320    
321     for (;;)
322     {
323     req = reqs;
324    
325     if (reqs)
326     {
327     reqs = reqs->next;
328     if (!reqs) reqe = 0;
329     }
330    
331     if (req)
332     break;
333    
334     pthread_cond_wait (&reqwait, &reqlock);
335     }
336    
337     pthread_mutex_unlock (&reqlock);
338    
339 root 1.1 errno = 0; /* strictly unnecessary */
340    
341 root 1.3 type = req->type;
342    
343     switch (type)
344 root 1.1 {
345 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
346     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
347 root 1.16
348 root 1.1 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
349    
350     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
351     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
352     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
353    
354     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
355     case REQ_CLOSE: req->result = close (req->fd); break;
356     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
357    
358 root 1.17 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
359 root 1.1 case REQ_FSYNC: req->result = fsync (req->fd); break;
360    
361     case REQ_QUIT:
362 root 1.3 break;
363 root 1.1
364     default:
365     req->result = ENOSYS;
366     break;
367     }
368    
369     req->errorno = errno;
370 root 1.3
371     pthread_mutex_lock (&reslock);
372    
373     req->next = 0;
374    
375     if (rese)
376     {
377     rese->next = req;
378     rese = req;
379     }
380     else
381     {
382     rese = ress = req;
383    
384     /* write a dummy byte to the pipe so fh becomes ready */
385     write (respipe [1], &respipe, 1);
386     }
387    
388     pthread_mutex_unlock (&reslock);
389 root 1.1 }
390 root 1.3 while (type != REQ_QUIT);
391 root 1.1
392     return 0;
393     }
394    
395     MODULE = IO::AIO PACKAGE = IO::AIO
396    
397 root 1.8 PROTOTYPES: ENABLE
398    
399 root 1.1 BOOT:
400     {
401 root 1.3 if (pipe (respipe))
402     croak ("unable to initialize result pipe");
403 root 1.1
404 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
405 root 1.1 croak ("cannot set result pipe to nonblocking mode");
406    
407 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
408 root 1.1 croak ("cannot set result pipe to nonblocking mode");
409     }
410    
411     void
412     min_parallel(nthreads)
413     int nthreads
414     PROTOTYPE: $
415     CODE:
416     while (nthreads > started)
417     start_thread ();
418    
419     void
420     max_parallel(nthreads)
421     int nthreads
422     PROTOTYPE: $
423     CODE:
424     {
425     int cur = started;
426     while (cur > nthreads)
427     {
428     end_thread ();
429     cur--;
430     }
431    
432     while (started > nthreads)
433     {
434     poll_wait ();
435 root 1.5 poll_cb ();
436 root 1.1 }
437     }
438    
439 root 1.4 int
440     max_outstanding(nreqs)
441     int nreqs
442     PROTOTYPE: $
443     CODE:
444     RETVAL = max_outstanding;
445     max_outstanding = nreqs;
446    
447 root 1.1 void
448 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
449 root 1.1 SV * pathname
450     int flags
451     int mode
452     SV * callback
453 root 1.8 PROTOTYPE: $$$;$
454 root 1.1 CODE:
455     {
456     aio_req req;
457    
458     Newz (0, req, 1, aio_cb);
459    
460     if (!req)
461     croak ("out of memory during aio_req allocation");
462    
463     req->type = REQ_OPEN;
464     req->data = newSVsv (pathname);
465     req->dataptr = SvPV_nolen (req->data);
466     req->fd = flags;
467     req->mode = mode;
468     req->callback = SvREFCNT_inc (callback);
469    
470     send_req (req);
471     }
472    
473     void
474 root 1.8 aio_close(fh,callback=&PL_sv_undef)
475 root 1.13 SV * fh
476     SV * callback
477 root 1.8 PROTOTYPE: $;$
478 root 1.1 ALIAS:
479     aio_close = REQ_CLOSE
480     aio_fsync = REQ_FSYNC
481     aio_fdatasync = REQ_FDATASYNC
482     CODE:
483     {
484     aio_req req;
485    
486     Newz (0, req, 1, aio_cb);
487    
488     if (!req)
489     croak ("out of memory during aio_req allocation");
490    
491     req->type = ix;
492 root 1.13 req->fh = newSVsv (fh);
493     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
494 root 1.1 req->callback = SvREFCNT_inc (callback);
495    
496     send_req (req);
497     }
498    
499     void
500 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
501 root 1.13 SV * fh
502     UV offset
503     IV length
504     SV * data
505     IV dataoffset
506     SV * callback
507     ALIAS:
508     aio_read = REQ_READ
509     aio_write = REQ_WRITE
510 root 1.8 PROTOTYPE: $$$$$;$
511 root 1.1 CODE:
512 root 1.13 {
513     aio_req req;
514     STRLEN svlen;
515     char *svptr = SvPV (data, svlen);
516    
517     SvUPGRADE (data, SVt_PV);
518     SvPOK_on (data);
519 root 1.1
520 root 1.13 if (dataoffset < 0)
521     dataoffset += svlen;
522    
523     if (dataoffset < 0 || dataoffset > svlen)
524     croak ("data offset outside of string");
525    
526     if (ix == REQ_WRITE)
527     {
528     /* write: check length and adjust. */
529     if (length < 0 || length + dataoffset > svlen)
530     length = svlen - dataoffset;
531     }
532     else
533     {
534     /* read: grow scalar as necessary */
535     svptr = SvGROW (data, length + dataoffset);
536     }
537    
538     if (length < 0)
539     croak ("length must not be negative");
540    
541     Newz (0, req, 1, aio_cb);
542    
543     if (!req)
544     croak ("out of memory during aio_req allocation");
545    
546     req->type = ix;
547     req->fh = newSVsv (fh);
548     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
549     : IoOFP (sv_2io (fh)));
550     req->offset = offset;
551     req->length = length;
552     req->data = SvREFCNT_inc (data);
553     req->dataptr = (char *)svptr + dataoffset;
554     req->callback = SvREFCNT_inc (callback);
555    
556     send_req (req);
557     }
558 root 1.1
559     void
560 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
561 root 1.13 SV * fh
562     UV offset
563     IV length
564     SV * callback
565 root 1.8 PROTOTYPE: $$$;$
566 root 1.1 CODE:
567     {
568     aio_req req;
569    
570     if (length < 0)
571     croak ("length must not be negative");
572    
573     Newz (0, req, 1, aio_cb);
574    
575     if (!req)
576     croak ("out of memory during aio_req allocation");
577    
578     req->type = REQ_READAHEAD;
579 root 1.13 req->fh = newSVsv (fh);
580     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
581 root 1.1 req->offset = offset;
582     req->length = length;
583     req->callback = SvREFCNT_inc (callback);
584    
585     send_req (req);
586     }
587    
588     void
589 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
590 root 1.1 SV * fh_or_path
591     SV * callback
592     ALIAS:
593 root 1.8 aio_stat = REQ_STAT
594     aio_lstat = REQ_LSTAT
595 root 1.1 CODE:
596     {
597     aio_req req;
598    
599     Newz (0, req, 1, aio_cb);
600    
601     if (!req)
602     croak ("out of memory during aio_req allocation");
603    
604     New (0, req->statdata, 1, Stat_t);
605    
606     if (!req->statdata)
607     croak ("out of memory during aio_req->statdata allocation");
608    
609     if (SvPOK (fh_or_path))
610     {
611 root 1.8 req->type = ix;
612 root 1.1 req->data = newSVsv (fh_or_path);
613     req->dataptr = SvPV_nolen (req->data);
614     }
615     else
616     {
617     req->type = REQ_FSTAT;
618 root 1.13 req->fh = newSVsv (fh_or_path);
619 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
620     }
621    
622     req->callback = SvREFCNT_inc (callback);
623    
624     send_req (req);
625     }
626    
627     void
628 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
629 root 1.1 SV * pathname
630     SV * callback
631     CODE:
632     {
633     aio_req req;
634    
635     Newz (0, req, 1, aio_cb);
636    
637     if (!req)
638     croak ("out of memory during aio_req allocation");
639    
640     req->type = REQ_UNLINK;
641     req->data = newSVsv (pathname);
642     req->dataptr = SvPV_nolen (req->data);
643     req->callback = SvREFCNT_inc (callback);
644    
645     send_req (req);
646     }
647    
648 root 1.6 void
649     flush()
650     PROTOTYPE:
651     CODE:
652     while (nreqs)
653     {
654     poll_wait ();
655     poll_cb ();
656     }
657    
658 root 1.7 void
659     poll()
660     PROTOTYPE:
661     CODE:
662     if (nreqs)
663     {
664     poll_wait ();
665     poll_cb ();
666     }
667    
668 root 1.1 int
669     poll_fileno()
670     PROTOTYPE:
671     CODE:
672 root 1.3 RETVAL = respipe [0];
673 root 1.1 OUTPUT:
674     RETVAL
675    
676     int
677     poll_cb(...)
678     PROTOTYPE:
679     CODE:
680 root 1.5 RETVAL = poll_cb ();
681 root 1.1 OUTPUT:
682     RETVAL
683    
684     void
685     poll_wait()
686     PROTOTYPE:
687     CODE:
688 root 1.3 if (nreqs)
689     poll_wait ();
690 root 1.1
691     int
692     nreqs()
693     PROTOTYPE:
694     CODE:
695     RETVAL = nreqs;
696     OUTPUT:
697     RETVAL
698