ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.15
Committed: Sat Jul 23 18:19:56 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.14: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.15 #include "EXTERN.h"
2 root 1.1 #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <sys/stat.h>
7 root 1.10
8 root 1.1 #include <unistd.h>
9     #include <fcntl.h>
10     #include <signal.h>
11     #include <sched.h>
12 root 1.10 #if __linux
13     #include <sys/syscall.h>
14     #endif
15 root 1.1
16     #include <pthread.h>
17    
18     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
20     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
21    
22 root 1.4 #if __ia64
23     # define STACKSIZE 65536
24 root 1.1 #else
25 root 1.4 # define STACKSIZE 4096
26 root 1.1 #endif
27    
28     enum {
29     REQ_QUIT,
30     REQ_OPEN, REQ_CLOSE,
31     REQ_READ, REQ_WRITE, REQ_READAHEAD,
32     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
33     REQ_FSYNC, REQ_FDATASYNC,
34     };
35    
36     typedef struct aio_cb {
37 root 1.3 struct aio_cb *volatile next;
38 root 1.1
39     int type;
40    
41     int fd;
42     off_t offset;
43     size_t length;
44     ssize_t result;
45     mode_t mode; /* open */
46     int errorno;
47 root 1.13 SV *data, *callback, *fh;
48 root 1.1 void *dataptr;
49     STRLEN dataoffset;
50    
51     Stat_t *statdata;
52     } aio_cb;
53    
54     typedef aio_cb *aio_req;
55    
56     static int started;
57 root 1.11 static volatile int nreqs;
58 root 1.4 static int max_outstanding = 1<<30;
59 root 1.3 static int respipe [2];
60 root 1.1
61 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
62     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
63     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
64    
65     static volatile aio_req reqs, reqe; /* queue start, queue end */
66     static volatile aio_req ress, rese; /* queue start, queue end */
67 root 1.1
68     static void
69     poll_wait ()
70     {
71 root 1.11 if (nreqs && !ress)
72     {
73     fd_set rfd;
74     FD_ZERO(&rfd);
75     FD_SET(respipe [0], &rfd);
76 root 1.3
77 root 1.11 select (respipe [0] + 1, &rfd, 0, 0, 0);
78     }
79 root 1.1 }
80    
81     static int
82 root 1.5 poll_cb ()
83 root 1.1 {
84     dSP;
85     int count = 0;
86 root 1.11 aio_req req, prv;
87    
88     pthread_mutex_lock (&reslock);
89    
90 root 1.3 {
91 root 1.11 /* read any signals sent by the worker threads */
92 root 1.3 char buf [32];
93     while (read (respipe [0], buf, 32) > 0)
94     ;
95     }
96 root 1.1
97 root 1.11 req = ress;
98     ress = rese = 0;
99 root 1.3
100 root 1.11 pthread_mutex_unlock (&reslock);
101 root 1.3
102 root 1.11 while (req)
103     {
104 root 1.1 nreqs--;
105    
106     if (req->type == REQ_QUIT)
107     started--;
108     else
109     {
110     int errorno = errno;
111     errno = req->errorno;
112    
113     if (req->type == REQ_READ)
114     SvCUR_set (req->data, req->dataoffset
115     + req->result > 0 ? req->result : 0);
116    
117     if (req->data)
118     SvREFCNT_dec (req->data);
119    
120 root 1.13 if (req->fh)
121     SvREFCNT_dec (req->fh);
122    
123 root 1.1 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
124     {
125     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
126     PL_laststatval = req->result;
127     PL_statcache = *(req->statdata);
128    
129     Safefree (req->statdata);
130     }
131    
132 root 1.13 ENTER;
133 root 1.1 PUSHMARK (SP);
134     XPUSHs (sv_2mortal (newSViv (req->result)));
135 root 1.2
136     if (req->type == REQ_OPEN)
137     {
138     /* convert fd to fh */
139     SV *fh;
140    
141     PUTBACK;
142     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
143     SPAGAIN;
144    
145 root 1.13 fh = SvREFCNT_inc (POPs);
146 root 1.2
147     PUSHMARK (SP);
148 root 1.13 XPUSHs (sv_2mortal (fh));
149 root 1.2 }
150    
151 root 1.8 if (SvOK (req->callback))
152     {
153     PUTBACK;
154     call_sv (req->callback, G_VOID | G_EVAL);
155     SPAGAIN;
156     }
157 root 1.13
158     LEAVE;
159 root 1.1
160     if (req->callback)
161     SvREFCNT_dec (req->callback);
162    
163     errno = errorno;
164     count++;
165     }
166    
167 root 1.11 prv = req;
168     req = req->next;
169     Safefree (prv);
170    
171     /* TODO: croak on errors? */
172 root 1.1 }
173    
174     return count;
175     }
176    
177 root 1.4 static void *aio_proc(void *arg);
178    
179     static void
180     start_thread (void)
181     {
182     sigset_t fullsigset, oldsigset;
183     pthread_t tid;
184     pthread_attr_t attr;
185    
186     pthread_attr_init (&attr);
187     pthread_attr_setstacksize (&attr, STACKSIZE);
188     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
189    
190     sigfillset (&fullsigset);
191     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
192    
193     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
194     started++;
195    
196     sigprocmask (SIG_SETMASK, &oldsigset, 0);
197     }
198    
199     static void
200     send_req (aio_req req)
201     {
202     nreqs++;
203    
204     pthread_mutex_lock (&reqlock);
205    
206     req->next = 0;
207    
208     if (reqe)
209     {
210     reqe->next = req;
211     reqe = req;
212     }
213     else
214     reqe = reqs = req;
215    
216     pthread_cond_signal (&reqwait);
217     pthread_mutex_unlock (&reqlock);
218    
219     while (nreqs > max_outstanding)
220     {
221     poll_wait ();
222     poll_cb ();
223     }
224     }
225    
226     static void
227     end_thread (void)
228     {
229     aio_req req;
230     New (0, req, 1, aio_cb);
231     req->type = REQ_QUIT;
232    
233     send_req (req);
234     }
235    
236 root 1.1 static void *
237     aio_proc (void *thr_arg)
238     {
239     aio_req req;
240 root 1.3 int type;
241 root 1.1
242 root 1.3 do
243 root 1.1 {
244 root 1.3 pthread_mutex_lock (&reqlock);
245    
246     for (;;)
247     {
248     req = reqs;
249    
250     if (reqs)
251     {
252     reqs = reqs->next;
253     if (!reqs) reqe = 0;
254     }
255    
256     if (req)
257     break;
258    
259     pthread_cond_wait (&reqwait, &reqlock);
260     }
261    
262     pthread_mutex_unlock (&reqlock);
263    
264 root 1.1 errno = 0; /* strictly unnecessary */
265    
266 root 1.3 type = req->type;
267    
268     switch (type)
269 root 1.1 {
270 root 1.10 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
271     case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
272 root 1.1 #if SYS_readahead
273     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
274     #else
275     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
276     #endif
277    
278     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
279     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
280     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
281    
282     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
283     case REQ_CLOSE: req->result = close (req->fd); break;
284     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
285    
286     case REQ_FSYNC: req->result = fsync (req->fd); break;
287     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
288    
289     case REQ_QUIT:
290 root 1.3 break;
291 root 1.1
292     default:
293     req->result = ENOSYS;
294     break;
295     }
296    
297     req->errorno = errno;
298 root 1.3
299     pthread_mutex_lock (&reslock);
300    
301     req->next = 0;
302    
303     if (rese)
304     {
305     rese->next = req;
306     rese = req;
307     }
308     else
309     {
310     rese = ress = req;
311    
312     /* write a dummy byte to the pipe so fh becomes ready */
313     write (respipe [1], &respipe, 1);
314     }
315    
316     pthread_mutex_unlock (&reslock);
317 root 1.1 }
318 root 1.3 while (type != REQ_QUIT);
319 root 1.1
320     return 0;
321     }
322    
323     MODULE = IO::AIO PACKAGE = IO::AIO
324    
325 root 1.8 PROTOTYPES: ENABLE
326    
327 root 1.1 BOOT:
328     {
329 root 1.3 if (pipe (respipe))
330     croak ("unable to initialize result pipe");
331 root 1.1
332 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
333 root 1.1 croak ("cannot set result pipe to nonblocking mode");
334    
335 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
336 root 1.1 croak ("cannot set result pipe to nonblocking mode");
337     }
338    
339     void
340     min_parallel(nthreads)
341     int nthreads
342     PROTOTYPE: $
343     CODE:
344     while (nthreads > started)
345     start_thread ();
346    
347     void
348     max_parallel(nthreads)
349     int nthreads
350     PROTOTYPE: $
351     CODE:
352     {
353     int cur = started;
354     while (cur > nthreads)
355     {
356     end_thread ();
357     cur--;
358     }
359    
360     while (started > nthreads)
361     {
362     poll_wait ();
363 root 1.5 poll_cb ();
364 root 1.1 }
365     }
366    
367 root 1.4 int
368     max_outstanding(nreqs)
369     int nreqs
370     PROTOTYPE: $
371     CODE:
372     RETVAL = max_outstanding;
373     max_outstanding = nreqs;
374    
375 root 1.1 void
376 root 1.8 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
377 root 1.1 SV * pathname
378     int flags
379     int mode
380     SV * callback
381 root 1.8 PROTOTYPE: $$$;$
382 root 1.1 CODE:
383     {
384     aio_req req;
385    
386     Newz (0, req, 1, aio_cb);
387    
388     if (!req)
389     croak ("out of memory during aio_req allocation");
390    
391     req->type = REQ_OPEN;
392     req->data = newSVsv (pathname);
393     req->dataptr = SvPV_nolen (req->data);
394     req->fd = flags;
395     req->mode = mode;
396     req->callback = SvREFCNT_inc (callback);
397    
398     send_req (req);
399     }
400    
401     void
402 root 1.8 aio_close(fh,callback=&PL_sv_undef)
403 root 1.13 SV * fh
404     SV * callback
405 root 1.8 PROTOTYPE: $;$
406 root 1.1 ALIAS:
407     aio_close = REQ_CLOSE
408     aio_fsync = REQ_FSYNC
409     aio_fdatasync = REQ_FDATASYNC
410     CODE:
411     {
412     aio_req req;
413    
414     Newz (0, req, 1, aio_cb);
415    
416     if (!req)
417     croak ("out of memory during aio_req allocation");
418    
419     req->type = ix;
420 root 1.13 req->fh = newSVsv (fh);
421     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
422 root 1.1 req->callback = SvREFCNT_inc (callback);
423    
424     send_req (req);
425     }
426    
427     void
428 root 1.8 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
429 root 1.13 SV * fh
430     UV offset
431     IV length
432     SV * data
433     IV dataoffset
434     SV * callback
435     ALIAS:
436     aio_read = REQ_READ
437     aio_write = REQ_WRITE
438 root 1.8 PROTOTYPE: $$$$$;$
439 root 1.1 CODE:
440 root 1.13 {
441     aio_req req;
442     STRLEN svlen;
443     char *svptr = SvPV (data, svlen);
444    
445     SvUPGRADE (data, SVt_PV);
446     SvPOK_on (data);
447 root 1.1
448 root 1.13 if (dataoffset < 0)
449     dataoffset += svlen;
450    
451     if (dataoffset < 0 || dataoffset > svlen)
452     croak ("data offset outside of string");
453    
454     if (ix == REQ_WRITE)
455     {
456     /* write: check length and adjust. */
457     if (length < 0 || length + dataoffset > svlen)
458     length = svlen - dataoffset;
459     }
460     else
461     {
462     /* read: grow scalar as necessary */
463     svptr = SvGROW (data, length + dataoffset);
464     }
465    
466     if (length < 0)
467     croak ("length must not be negative");
468    
469     Newz (0, req, 1, aio_cb);
470    
471     if (!req)
472     croak ("out of memory during aio_req allocation");
473    
474     req->type = ix;
475     req->fh = newSVsv (fh);
476     req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
477     : IoOFP (sv_2io (fh)));
478     req->offset = offset;
479     req->length = length;
480     req->data = SvREFCNT_inc (data);
481     req->dataptr = (char *)svptr + dataoffset;
482     req->callback = SvREFCNT_inc (callback);
483    
484     send_req (req);
485     }
486 root 1.1
487     void
488 root 1.8 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
489 root 1.13 SV * fh
490     UV offset
491     IV length
492     SV * callback
493 root 1.8 PROTOTYPE: $$$;$
494 root 1.1 CODE:
495     {
496     aio_req req;
497    
498     if (length < 0)
499     croak ("length must not be negative");
500    
501     Newz (0, req, 1, aio_cb);
502    
503     if (!req)
504     croak ("out of memory during aio_req allocation");
505    
506     req->type = REQ_READAHEAD;
507 root 1.13 req->fh = newSVsv (fh);
508     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
509 root 1.1 req->offset = offset;
510     req->length = length;
511     req->callback = SvREFCNT_inc (callback);
512    
513     send_req (req);
514     }
515    
516     void
517 root 1.8 aio_stat(fh_or_path,callback=&PL_sv_undef)
518 root 1.1 SV * fh_or_path
519     SV * callback
520     ALIAS:
521 root 1.8 aio_stat = REQ_STAT
522     aio_lstat = REQ_LSTAT
523 root 1.1 CODE:
524     {
525     aio_req req;
526    
527     Newz (0, req, 1, aio_cb);
528    
529     if (!req)
530     croak ("out of memory during aio_req allocation");
531    
532     New (0, req->statdata, 1, Stat_t);
533    
534     if (!req->statdata)
535     croak ("out of memory during aio_req->statdata allocation");
536    
537     if (SvPOK (fh_or_path))
538     {
539 root 1.8 req->type = ix;
540 root 1.1 req->data = newSVsv (fh_or_path);
541     req->dataptr = SvPV_nolen (req->data);
542     }
543     else
544     {
545     req->type = REQ_FSTAT;
546 root 1.13 req->fh = newSVsv (fh_or_path);
547 root 1.1 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
548     }
549    
550     req->callback = SvREFCNT_inc (callback);
551    
552     send_req (req);
553     }
554    
555     void
556 root 1.8 aio_unlink(pathname,callback=&PL_sv_undef)
557 root 1.1 SV * pathname
558     SV * callback
559     CODE:
560     {
561     aio_req req;
562    
563     Newz (0, req, 1, aio_cb);
564    
565     if (!req)
566     croak ("out of memory during aio_req allocation");
567    
568     req->type = REQ_UNLINK;
569     req->data = newSVsv (pathname);
570     req->dataptr = SvPV_nolen (req->data);
571     req->callback = SvREFCNT_inc (callback);
572    
573     send_req (req);
574     }
575    
576 root 1.6 void
577     flush()
578     PROTOTYPE:
579     CODE:
580     while (nreqs)
581     {
582     poll_wait ();
583     poll_cb ();
584     }
585    
586 root 1.7 void
587     poll()
588     PROTOTYPE:
589     CODE:
590     if (nreqs)
591     {
592     poll_wait ();
593     poll_cb ();
594     }
595    
596 root 1.1 int
597     poll_fileno()
598     PROTOTYPE:
599     CODE:
600 root 1.3 RETVAL = respipe [0];
601 root 1.1 OUTPUT:
602     RETVAL
603    
604     int
605     poll_cb(...)
606     PROTOTYPE:
607     CODE:
608 root 1.5 RETVAL = poll_cb ();
609 root 1.1 OUTPUT:
610     RETVAL
611    
612     void
613     poll_wait()
614     PROTOTYPE:
615     CODE:
616 root 1.3 if (nreqs)
617     poll_wait ();
618 root 1.1
619     int
620     nreqs()
621     PROTOTYPE:
622     CODE:
623     RETVAL = nreqs;
624     OUTPUT:
625     RETVAL
626