ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.7
Committed: Mon Jul 11 01:49:14 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.6: +10 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <sys/stat.h>
7     #include <unistd.h>
8     #include <fcntl.h>
9     #include <signal.h>
10     #include <sched.h>
11     #include <endian.h>
12    
13     #include <pthread.h>
14     #include <sys/syscall.h>
15    
16     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
17     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
18     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
19    
20 root 1.4 #if __ia64
21     # define STACKSIZE 65536
22 root 1.1 #else
23 root 1.4 # define STACKSIZE 4096
24 root 1.1 #endif
25    
26     enum {
27     REQ_QUIT,
28     REQ_OPEN, REQ_CLOSE,
29     REQ_READ, REQ_WRITE, REQ_READAHEAD,
30     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
31     REQ_FSYNC, REQ_FDATASYNC,
32     };
33    
34     typedef struct aio_cb {
35 root 1.3 struct aio_cb *volatile next;
36 root 1.1
37     int type;
38    
39     int fd;
40     off_t offset;
41     size_t length;
42     ssize_t result;
43     mode_t mode; /* open */
44     int errorno;
45     SV *data, *callback;
46     void *dataptr;
47     STRLEN dataoffset;
48    
49     Stat_t *statdata;
50     } aio_cb;
51    
52     typedef aio_cb *aio_req;
53    
54     static int started;
55     static int nreqs;
56 root 1.4 static int max_outstanding = 1<<30;
57 root 1.3 static int respipe [2];
58 root 1.1
59 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
60     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
61     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
62    
63     static volatile aio_req reqs, reqe; /* queue start, queue end */
64     static volatile aio_req ress, rese; /* queue start, queue end */
65 root 1.1
66     static void
67     poll_wait ()
68     {
69 root 1.3 if (!nreqs)
70     return;
71    
72 root 1.1 fd_set rfd;
73     FD_ZERO(&rfd);
74 root 1.3 FD_SET(respipe [0], &rfd);
75 root 1.1
76 root 1.3 select (respipe [0] + 1, &rfd, 0, 0, 0);
77 root 1.1 }
78    
79     static int
80 root 1.5 poll_cb ()
81 root 1.1 {
82     dSP;
83     int count = 0;
84     aio_req req;
85 root 1.3
86     {
87     /* read and signals sent by the worker threads */
88     char buf [32];
89     while (read (respipe [0], buf, 32) > 0)
90     ;
91     }
92 root 1.1
93 root 1.3 for (;;)
94 root 1.1 {
95 root 1.3 pthread_mutex_lock (&reslock);
96    
97     req = ress;
98    
99     if (ress)
100     {
101     ress = ress->next;
102     if (!ress) rese = 0;
103     }
104    
105     pthread_mutex_unlock (&reslock);
106    
107     if (!req)
108     break;
109    
110 root 1.1 nreqs--;
111    
112     if (req->type == REQ_QUIT)
113     started--;
114     else
115     {
116     int errorno = errno;
117     errno = req->errorno;
118    
119     if (req->type == REQ_READ)
120     SvCUR_set (req->data, req->dataoffset
121     + req->result > 0 ? req->result : 0);
122    
123     if (req->data)
124     SvREFCNT_dec (req->data);
125    
126     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
127     {
128     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
129     PL_laststatval = req->result;
130     PL_statcache = *(req->statdata);
131    
132     Safefree (req->statdata);
133     }
134    
135     PUSHMARK (SP);
136     XPUSHs (sv_2mortal (newSViv (req->result)));
137 root 1.2
138     if (req->type == REQ_OPEN)
139     {
140     /* convert fd to fh */
141     SV *fh;
142    
143     PUTBACK;
144     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
145     SPAGAIN;
146    
147     fh = POPs;
148    
149     PUSHMARK (SP);
150     XPUSHs (fh);
151     }
152    
153 root 1.1 PUTBACK;
154 root 1.2 call_sv (req->callback, G_VOID | G_EVAL);
155 root 1.1 SPAGAIN;
156    
157     if (req->callback)
158     SvREFCNT_dec (req->callback);
159    
160     errno = errorno;
161     count++;
162     }
163    
164     Safefree (req);
165     }
166    
167     return count;
168     }
169    
170 root 1.4 static void *aio_proc(void *arg);
171    
172     static void
173     start_thread (void)
174     {
175     sigset_t fullsigset, oldsigset;
176     pthread_t tid;
177     pthread_attr_t attr;
178    
179     pthread_attr_init (&attr);
180     pthread_attr_setstacksize (&attr, STACKSIZE);
181     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
182    
183     sigfillset (&fullsigset);
184     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
185    
186     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
187     started++;
188    
189     sigprocmask (SIG_SETMASK, &oldsigset, 0);
190     }
191    
192     static void
193     send_req (aio_req req)
194     {
195     nreqs++;
196    
197     pthread_mutex_lock (&reqlock);
198    
199     req->next = 0;
200    
201     if (reqe)
202     {
203     reqe->next = req;
204     reqe = req;
205     }
206     else
207     reqe = reqs = req;
208    
209     pthread_cond_signal (&reqwait);
210     pthread_mutex_unlock (&reqlock);
211    
212     while (nreqs > max_outstanding)
213     {
214     poll_wait ();
215     poll_cb ();
216     }
217     }
218    
219     static void
220     end_thread (void)
221     {
222     aio_req req;
223     New (0, req, 1, aio_cb);
224     req->type = REQ_QUIT;
225    
226     send_req (req);
227     }
228    
229     static void
230 root 1.5 read_write (int dowrite, int fd, off_t offset, size_t length,
231 root 1.4 SV *data, STRLEN dataoffset, SV *callback)
232     {
233     aio_req req;
234     STRLEN svlen;
235     char *svptr = SvPV (data, svlen);
236    
237     SvUPGRADE (data, SVt_PV);
238     SvPOK_on (data);
239    
240     if (dataoffset < 0)
241     dataoffset += svlen;
242    
243     if (dataoffset < 0 || dataoffset > svlen)
244     croak ("data offset outside of string");
245    
246     if (dowrite)
247     {
248     /* write: check length and adjust. */
249     if (length < 0 || length + dataoffset > svlen)
250     length = svlen - dataoffset;
251     }
252     else
253     {
254     /* read: grow scalar as necessary */
255     svptr = SvGROW (data, length + dataoffset);
256     }
257    
258     if (length < 0)
259     croak ("length must not be negative");
260    
261     Newz (0, req, 1, aio_cb);
262    
263     if (!req)
264     croak ("out of memory during aio_req allocation");
265    
266     req->type = dowrite ? REQ_WRITE : REQ_READ;
267     req->fd = fd;
268     req->offset = offset;
269     req->length = length;
270     req->data = SvREFCNT_inc (data);
271     req->dataptr = (char *)svptr + dataoffset;
272     req->callback = SvREFCNT_inc (callback);
273    
274     send_req (req);
275     }
276    
277 root 1.1 static void *
278     aio_proc (void *thr_arg)
279     {
280     aio_req req;
281 root 1.3 int type;
282 root 1.1
283 root 1.3 do
284 root 1.1 {
285 root 1.3 pthread_mutex_lock (&reqlock);
286    
287     for (;;)
288     {
289     req = reqs;
290    
291     if (reqs)
292     {
293     reqs = reqs->next;
294     if (!reqs) reqe = 0;
295     }
296    
297     if (req)
298     break;
299    
300     pthread_cond_wait (&reqwait, &reqlock);
301     }
302    
303     pthread_mutex_unlock (&reqlock);
304    
305 root 1.1 errno = 0; /* strictly unnecessary */
306    
307 root 1.3 type = req->type;
308    
309     switch (type)
310 root 1.1 {
311     case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break;
312     case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break;
313     #if SYS_readahead
314     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
315     #else
316     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
317     #endif
318    
319     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
320     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
321     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
322    
323     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
324     case REQ_CLOSE: req->result = close (req->fd); break;
325     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
326    
327     case REQ_FSYNC: req->result = fsync (req->fd); break;
328     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
329    
330     case REQ_QUIT:
331 root 1.3 break;
332 root 1.1
333     default:
334     req->result = ENOSYS;
335     break;
336     }
337    
338     req->errorno = errno;
339 root 1.3
340     pthread_mutex_lock (&reslock);
341    
342     req->next = 0;
343    
344     if (rese)
345     {
346     rese->next = req;
347     rese = req;
348     }
349     else
350     {
351     rese = ress = req;
352    
353     /* write a dummy byte to the pipe so fh becomes ready */
354     write (respipe [1], &respipe, 1);
355     }
356    
357     pthread_mutex_unlock (&reslock);
358 root 1.1 }
359 root 1.3 while (type != REQ_QUIT);
360 root 1.1
361     return 0;
362     }
363    
364     MODULE = IO::AIO PACKAGE = IO::AIO
365    
366     BOOT:
367     {
368 root 1.3 if (pipe (respipe))
369     croak ("unable to initialize result pipe");
370 root 1.1
371 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
372 root 1.1 croak ("cannot set result pipe to nonblocking mode");
373    
374 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
375 root 1.1 croak ("cannot set result pipe to nonblocking mode");
376     }
377    
378     void
379     min_parallel(nthreads)
380     int nthreads
381     PROTOTYPE: $
382     CODE:
383     while (nthreads > started)
384     start_thread ();
385    
386     void
387     max_parallel(nthreads)
388     int nthreads
389     PROTOTYPE: $
390     CODE:
391     {
392     int cur = started;
393     while (cur > nthreads)
394     {
395     end_thread ();
396     cur--;
397     }
398    
399     while (started > nthreads)
400     {
401     poll_wait ();
402 root 1.5 poll_cb ();
403 root 1.1 }
404     }
405    
406 root 1.4 int
407     max_outstanding(nreqs)
408     int nreqs
409     PROTOTYPE: $
410     CODE:
411     RETVAL = max_outstanding;
412     max_outstanding = nreqs;
413    
414 root 1.1 void
415     aio_open(pathname,flags,mode,callback)
416     SV * pathname
417     int flags
418     int mode
419     SV * callback
420     PROTOTYPE: $$$$
421     CODE:
422     {
423     aio_req req;
424    
425     Newz (0, req, 1, aio_cb);
426    
427     if (!req)
428     croak ("out of memory during aio_req allocation");
429    
430     req->type = REQ_OPEN;
431     req->data = newSVsv (pathname);
432     req->dataptr = SvPV_nolen (req->data);
433     req->fd = flags;
434     req->mode = mode;
435     req->callback = SvREFCNT_inc (callback);
436    
437     send_req (req);
438     }
439    
440     void
441     aio_close(fh,callback)
442     InputStream fh
443     SV * callback
444     PROTOTYPE: $$
445     ALIAS:
446     aio_close = REQ_CLOSE
447     aio_fsync = REQ_FSYNC
448     aio_fdatasync = REQ_FDATASYNC
449     CODE:
450     {
451     aio_req req;
452    
453     Newz (0, req, 1, aio_cb);
454    
455     if (!req)
456     croak ("out of memory during aio_req allocation");
457    
458     req->type = ix;
459     req->fd = PerlIO_fileno (fh);
460     req->callback = SvREFCNT_inc (callback);
461    
462     send_req (req);
463     }
464    
465     void
466     aio_read(fh,offset,length,data,dataoffset,callback)
467     InputStream fh
468     UV offset
469     IV length
470     SV * data
471     IV dataoffset
472     SV * callback
473     PROTOTYPE: $$$$$$
474     CODE:
475 root 1.5 read_write (0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
476 root 1.1
477     void
478     aio_write(fh,offset,length,data,dataoffset,callback)
479     OutputStream fh
480     UV offset
481     IV length
482     SV * data
483     IV dataoffset
484     SV * callback
485     PROTOTYPE: $$$$$$
486     CODE:
487 root 1.5 read_write (1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
488 root 1.1
489     void
490     aio_readahead(fh,offset,length,callback)
491     InputStream fh
492     UV offset
493     IV length
494     SV * callback
495     PROTOTYPE: $$$$
496     CODE:
497     {
498     aio_req req;
499    
500     if (length < 0)
501     croak ("length must not be negative");
502    
503     Newz (0, req, 1, aio_cb);
504    
505     if (!req)
506     croak ("out of memory during aio_req allocation");
507    
508     req->type = REQ_READAHEAD;
509     req->fd = PerlIO_fileno (fh);
510     req->offset = offset;
511     req->length = length;
512     req->callback = SvREFCNT_inc (callback);
513    
514     send_req (req);
515     }
516    
517     void
518     aio_stat(fh_or_path,callback)
519     SV * fh_or_path
520     SV * callback
521     PROTOTYPE: $$
522     ALIAS:
523     aio_lstat = 1
524     CODE:
525     {
526     aio_req req;
527    
528     Newz (0, req, 1, aio_cb);
529    
530     if (!req)
531     croak ("out of memory during aio_req allocation");
532    
533     New (0, req->statdata, 1, Stat_t);
534    
535     if (!req->statdata)
536     croak ("out of memory during aio_req->statdata allocation");
537    
538     if (SvPOK (fh_or_path))
539     {
540     req->type = ix ? REQ_LSTAT : REQ_STAT;
541     req->data = newSVsv (fh_or_path);
542     req->dataptr = SvPV_nolen (req->data);
543     }
544     else
545     {
546     req->type = REQ_FSTAT;
547     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
548     }
549    
550     req->callback = SvREFCNT_inc (callback);
551    
552     send_req (req);
553     }
554    
555     void
556     aio_unlink(pathname,callback)
557     SV * pathname
558     SV * callback
559     PROTOTYPE: $$
560     CODE:
561     {
562     aio_req req;
563    
564     Newz (0, req, 1, aio_cb);
565    
566     if (!req)
567     croak ("out of memory during aio_req allocation");
568    
569     req->type = REQ_UNLINK;
570     req->data = newSVsv (pathname);
571     req->dataptr = SvPV_nolen (req->data);
572     req->callback = SvREFCNT_inc (callback);
573    
574     send_req (req);
575     }
576    
577 root 1.6 void
578     flush()
579     PROTOTYPE:
580     CODE:
581     while (nreqs)
582     {
583     poll_wait ();
584     poll_cb ();
585     }
586    
587 root 1.7 void
588     poll()
589     PROTOTYPE:
590     CODE:
591     if (nreqs)
592     {
593     poll_wait ();
594     poll_cb ();
595     }
596    
597 root 1.1 int
598     poll_fileno()
599     PROTOTYPE:
600     CODE:
601 root 1.3 RETVAL = respipe [0];
602 root 1.1 OUTPUT:
603     RETVAL
604    
605     int
606     poll_cb(...)
607     PROTOTYPE:
608     CODE:
609 root 1.5 RETVAL = poll_cb ();
610 root 1.1 OUTPUT:
611     RETVAL
612    
613     void
614     poll_wait()
615     PROTOTYPE:
616     CODE:
617 root 1.3 if (nreqs)
618     poll_wait ();
619 root 1.1
620     int
621     nreqs()
622     PROTOTYPE:
623     CODE:
624     RETVAL = nreqs;
625     OUTPUT:
626     RETVAL
627