ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.10
Committed: Wed Jul 13 00:13:09 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.9: +8 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5 root 1.10 #define _XOPEN_SOURCE 500
6    
7 root 1.1 #include <sys/types.h>
8     #include <sys/stat.h>
9 root 1.10
10 root 1.1 #include <unistd.h>
11     #include <fcntl.h>
12     #include <signal.h>
13     #include <sched.h>
14 root 1.10 #if __linux
15     #include <sys/syscall.h>
16     #endif
17 root 1.1
18     #include <pthread.h>
19    
20     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
21     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
22     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
23    
24 root 1.4 #if __ia64
25     # define STACKSIZE 65536
26 root 1.1 #else
27 root 1.4 # define STACKSIZE 4096
28 root 1.1 #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
35     REQ_FSYNC, REQ_FDATASYNC,
36     };
37    
38     typedef struct aio_cb {
39 root 1.3 struct aio_cb *volatile next;
40 root 1.1
41     int type;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46     ssize_t result;
47     mode_t mode; /* open */
48     int errorno;
49     SV *data, *callback;
50     void *dataptr;
51     STRLEN dataoffset;
52    
53     Stat_t *statdata;
54     } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59     static int nreqs;
60 root 1.4 static int max_outstanding = 1<<30;
61 root 1.3 static int respipe [2];
62 root 1.1
63 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
64     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
65     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
66    
67     static volatile aio_req reqs, reqe; /* queue start, queue end */
68     static volatile aio_req ress, rese; /* queue start, queue end */
69 root 1.1
70     static void
71     poll_wait ()
72     {
73 root 1.3 if (!nreqs)
74     return;
75    
76 root 1.1 fd_set rfd;
77     FD_ZERO(&rfd);
78 root 1.3 FD_SET(respipe [0], &rfd);
79 root 1.1
80 root 1.3 select (respipe [0] + 1, &rfd, 0, 0, 0);
81 root 1.1 }
82    
83     static int
84 root 1.5 poll_cb ()
85 root 1.1 {
86     dSP;
87     int count = 0;
88     aio_req req;
89 root 1.3
90     {
91     /* read and signals sent by the worker threads */
92     char buf [32];
93     while (read (respipe [0], buf, 32) > 0)
94     ;
95     }
96 root 1.1
97 root 1.3 for (;;)
98 root 1.1 {
99 root 1.3 pthread_mutex_lock (&reslock);
100    
101     req = ress;
102    
103     if (ress)
104     {
105     ress = ress->next;
106     if (!ress) rese = 0;
107     }
108    
109     pthread_mutex_unlock (&reslock);
110    
111     if (!req)
112     break;
113    
114 root 1.1 nreqs--;
115    
116     if (req->type == REQ_QUIT)
117     started--;
118     else
119     {
120     int errorno = errno;
121     errno = req->errorno;
122    
123     if (req->type == REQ_READ)
124     SvCUR_set (req->data, req->dataoffset
125     + req->result > 0 ? req->result : 0);
126    
127     if (req->data)
128     SvREFCNT_dec (req->data);
129    
130     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
131     {
132     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
133     PL_laststatval = req->result;
134     PL_statcache = *(req->statdata);
135    
136     Safefree (req->statdata);
137     }
138    
139     PUSHMARK (SP);
140     XPUSHs (sv_2mortal (newSViv (req->result)));
141 root 1.2
142     if (req->type == REQ_OPEN)
143     {
144     /* convert fd to fh */
145     SV *fh;
146    
147     PUTBACK;
148     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
149     SPAGAIN;
150    
151     fh = POPs;
152    
153     PUSHMARK (SP);
154     XPUSHs (fh);
155     }
156    
157 root 1.8 if (SvOK (req->callback))
158     {
159     PUTBACK;
160     call_sv (req->callback, G_VOID | G_EVAL);
161     SPAGAIN;
162     }
163 root 1.1
164     if (req->callback)
165     SvREFCNT_dec (req->callback);
166    
167     errno = errorno;
168     count++;
169     }
170    
171     Safefree (req);
172     }
173    
174     return count;
175     }
176    
177 root 1.4 static void *aio_proc(void *arg);
178    
179     static void
180     start_thread (void)
181     {
182     sigset_t fullsigset, oldsigset;
183     pthread_t tid;
184     pthread_attr_t attr;
185    
186     pthread_attr_init (&attr);
187     pthread_attr_setstacksize (&attr, STACKSIZE);
188     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
189    
190     sigfillset (&fullsigset);
191     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
192    
193     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
194     started++;
195    
196     sigprocmask (SIG_SETMASK, &oldsigset, 0);
197     }
198    
199     static void
200     send_req (aio_req req)
201     {
202     nreqs++;
203    
204     pthread_mutex_lock (&reqlock);
205    
206     req->next = 0;
207    
208     if (reqe)
209     {
210     reqe->next = req;
211     reqe = req;
212     }
213     else
214     reqe = reqs = req;
215    
216     pthread_cond_signal (&reqwait);
217     pthread_mutex_unlock (&reqlock);
218    
219     while (nreqs > max_outstanding)
220     {
221     poll_wait ();
222     poll_cb ();
223     }
224     }
225    
226     static void
227     end_thread (void)
228     {
229     aio_req req;
230     New (0, req, 1, aio_cb);
231     req->type = REQ_QUIT;
232    
233     send_req (req);
234     }
235    
236     static void
237 root 1.5 read_write (int dowrite, int fd, off_t offset, size_t length,
238 root 1.4 SV *data, STRLEN dataoffset, SV *callback)
239     {
240     aio_req req;
241     STRLEN svlen;
242     char *svptr = SvPV (data, svlen);
243    
244     SvUPGRADE (data, SVt_PV);
245     SvPOK_on (data);
246    
247     if (dataoffset < 0)
248     dataoffset += svlen;
249    
250     if (dataoffset < 0 || dataoffset > svlen)
251     croak ("data offset outside of string");
252    
253     if (dowrite)
254     {
255     /* write: check length and adjust. */
256     if (length < 0 || length + dataoffset > svlen)
257     length = svlen - dataoffset;
258     }
259     else
260     {
261     /* read: grow scalar as necessary */
262     svptr = SvGROW (data, length + dataoffset);
263     }
264    
265     if (length < 0)
266     croak ("length must not be negative");
267    
268     Newz (0, req, 1, aio_cb);
269    
270     if (!req)
271     croak ("out of memory during aio_req allocation");
272    
273     req->type = dowrite ? REQ_WRITE : REQ_READ;
274     req->fd = fd;
275     req->offset = offset;
276     req->length = length;
277     req->data = SvREFCNT_inc (data);
278     req->dataptr = (char *)svptr + dataoffset;
279     req->callback = SvREFCNT_inc (callback);
280    
281     send_req (req);
282     }
283    
284 root 1.1 static void *
285     aio_proc (void *thr_arg)
286     {
287     aio_req req;
288 root 1.3 int type;
289 root 1.1
290 root 1.3 do
291 root 1.1 {
292 root 1.3 pthread_mutex_lock (&reqlock);
293    
294     for (;;)
295     {
296     req = reqs;
297    
298     if (reqs)
299     {
300     reqs = reqs->next;
301     if (!reqs) reqe = 0;
302     }
303    
304     if (req)
305     break;
306    
307     pthread_cond_wait (&reqwait, &reqlock);
308     }
309    
310     pthread_mutex_unlock (&reqlock);
311    
312 root 1.1 errno = 0; /* strictly unnecessary */
313    
314 root 1.3 type = req->type;
315    
316     switch (type)
317 root 1.1 {
318 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
319     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
320 root 1.1 #if SYS_readahead
321     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
322     #else
323     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
324     #endif
325    
326     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
327     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
328     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
329    
330     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
331     case REQ_CLOSE: req->result = close (req->fd); break;
332     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
333    
334     case REQ_FSYNC: req->result = fsync (req->fd); break;
335     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
336    
337     case REQ_QUIT:
338 root 1.3 break;
339 root 1.1
340     default:
341     req->result = ENOSYS;
342     break;
343     }
344    
345     req->errorno = errno;
346 root 1.3
347     pthread_mutex_lock (&reslock);
348    
349     req->next = 0;
350    
351     if (rese)
352     {
353     rese->next = req;
354     rese = req;
355     }
356     else
357     {
358     rese = ress = req;
359    
360     /* write a dummy byte to the pipe so fh becomes ready */
361     write (respipe [1], &respipe, 1);
362     }
363    
364     pthread_mutex_unlock (&reslock);
365 root 1.1 }
366 root 1.3 while (type != REQ_QUIT);
367 root 1.1
368     return 0;
369     }
370    
371     MODULE = IO::AIO PACKAGE = IO::AIO
372    
373 root 1.8 PROTOTYPES: ENABLE
374    
375 root 1.1 BOOT:
376     {
377 root 1.3 if (pipe (respipe))
378     croak ("unable to initialize result pipe");
379 root 1.1
380 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
381 root 1.1 croak ("cannot set result pipe to nonblocking mode");
382    
383 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
384 root 1.1 croak ("cannot set result pipe to nonblocking mode");
385     }
386    
387     void
388     min_parallel(nthreads)
389     int nthreads
390     PROTOTYPE: $
391     CODE:
392     while (nthreads > started)
393     start_thread ();
394    
395     void
396     max_parallel(nthreads)
397     int nthreads
398     PROTOTYPE: $
399     CODE:
400     {
401     int cur = started;
402     while (cur > nthreads)
403     {
404     end_thread ();
405     cur--;
406     }
407    
408     while (started > nthreads)
409     {
410     poll_wait ();
411 root 1.5 poll_cb ();
412 root 1.1 }
413     }
414    
415 root 1.4 int
416     max_outstanding(nreqs)
417     int nreqs
418     PROTOTYPE: $
419     CODE:
420     RETVAL = max_outstanding;
421     max_outstanding = nreqs;
422    
423 root 1.1 void
424 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
425 root 1.1 SV * pathname
426     int flags
427     int mode
428     SV * callback
429 root 1.8 PROTOTYPE: $$$;$
430 root 1.1 CODE:
431     {
432     aio_req req;
433    
434     Newz (0, req, 1, aio_cb);
435    
436     if (!req)
437     croak ("out of memory during aio_req allocation");
438    
439     req->type = REQ_OPEN;
440     req->data = newSVsv (pathname);
441     req->dataptr = SvPV_nolen (req->data);
442     req->fd = flags;
443     req->mode = mode;
444     req->callback = SvREFCNT_inc (callback);
445    
446     send_req (req);
447     }
448    
449     void
450 root 1.8 aio_close(fh,callback=&PL_sv_undef)
451 root 1.1 InputStream fh
452     SV * callback
453 root 1.8 PROTOTYPE: $;$
454 root 1.1 ALIAS:
455     aio_close = REQ_CLOSE
456     aio_fsync = REQ_FSYNC
457     aio_fdatasync = REQ_FDATASYNC
458     CODE:
459     {
460     aio_req req;
461    
462     Newz (0, req, 1, aio_cb);
463    
464     if (!req)
465     croak ("out of memory during aio_req allocation");
466    
467     req->type = ix;
468     req->fd = PerlIO_fileno (fh);
469     req->callback = SvREFCNT_inc (callback);
470    
471     send_req (req);
472     }
473    
474     void
475 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
476 root 1.1 InputStream fh
477     UV offset
478     IV length
479     SV * data
480     IV dataoffset
481     SV * callback
482 root 1.8 PROTOTYPE: $$$$$;$
483 root 1.1 CODE:
484 root 1.5 read_write (0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
485 root 1.1
486     void
487 root 1.8 aio_write(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
488 root 1.1 OutputStream fh
489     UV offset
490     IV length
491     SV * data
492     IV dataoffset
493     SV * callback
494 root 1.8 PROTOTYPE: $$$$$;$
495 root 1.1 CODE:
496 root 1.5 read_write (1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
497 root 1.1
498     void
499 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
500 root 1.1 InputStream fh
501     UV offset
502     IV length
503     SV * callback
504 root 1.8 PROTOTYPE: $$$;$
505 root 1.1 CODE:
506     {
507     aio_req req;
508    
509     if (length < 0)
510     croak ("length must not be negative");
511    
512     Newz (0, req, 1, aio_cb);
513    
514     if (!req)
515     croak ("out of memory during aio_req allocation");
516    
517     req->type = REQ_READAHEAD;
518     req->fd = PerlIO_fileno (fh);
519     req->offset = offset;
520     req->length = length;
521     req->callback = SvREFCNT_inc (callback);
522    
523     send_req (req);
524     }
525    
526     void
527 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
528 root 1.1 SV * fh_or_path
529     SV * callback
530     ALIAS:
531 root 1.8 aio_stat = REQ_STAT
532     aio_lstat = REQ_LSTAT
533 root 1.1 CODE:
534     {
535     aio_req req;
536    
537     Newz (0, req, 1, aio_cb);
538    
539     if (!req)
540     croak ("out of memory during aio_req allocation");
541    
542     New (0, req->statdata, 1, Stat_t);
543    
544     if (!req->statdata)
545     croak ("out of memory during aio_req->statdata allocation");
546    
547     if (SvPOK (fh_or_path))
548     {
549 root 1.8 req->type = ix;
550 root 1.1 req->data = newSVsv (fh_or_path);
551     req->dataptr = SvPV_nolen (req->data);
552     }
553     else
554     {
555     req->type = REQ_FSTAT;
556     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
557     }
558    
559     req->callback = SvREFCNT_inc (callback);
560    
561     send_req (req);
562     }
563    
564     void
565 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
566 root 1.1 SV * pathname
567     SV * callback
568     CODE:
569     {
570     aio_req req;
571    
572     Newz (0, req, 1, aio_cb);
573    
574     if (!req)
575     croak ("out of memory during aio_req allocation");
576    
577     req->type = REQ_UNLINK;
578     req->data = newSVsv (pathname);
579     req->dataptr = SvPV_nolen (req->data);
580     req->callback = SvREFCNT_inc (callback);
581    
582     send_req (req);
583     }
584    
585 root 1.6 void
586     flush()
587     PROTOTYPE:
588     CODE:
589     while (nreqs)
590     {
591     poll_wait ();
592     poll_cb ();
593     }
594    
595 root 1.7 void
596     poll()
597     PROTOTYPE:
598     CODE:
599     if (nreqs)
600     {
601     poll_wait ();
602     poll_cb ();
603     }
604    
605 root 1.1 int
606     poll_fileno()
607     PROTOTYPE:
608     CODE:
609 root 1.3 RETVAL = respipe [0];
610 root 1.1 OUTPUT:
611     RETVAL
612    
613     int
614     poll_cb(...)
615     PROTOTYPE:
616     CODE:
617 root 1.5 RETVAL = poll_cb ();
618 root 1.1 OUTPUT:
619     RETVAL
620    
621     void
622     poll_wait()
623     PROTOTYPE:
624     CODE:
625 root 1.3 if (nreqs)
626     poll_wait ();
627 root 1.1
628     int
629     nreqs()
630     PROTOTYPE:
631     CODE:
632     RETVAL = nreqs;
633     OUTPUT:
634     RETVAL
635