ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.3
Committed: Sun Jul 10 20:07:11 2005 UTC (18 years, 10 months ago) by root
Branch: MAIN
Changes since 1.2: +98 -37 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #include <sys/types.h>
8     #include <sys/stat.h>
9     #include <unistd.h>
10     #include <fcntl.h>
11     #include <signal.h>
12     #include <sched.h>
13     #include <endian.h>
14    
15     #include <pthread.h>
16     #include <sys/syscall.h>
17    
18     typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
20     typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
21    
22     #if __i386 || __amd64
23     # define STACKSIZE ( 256 * sizeof (long))
24     #elif __ia64
25     # define STACKSIZE (8192 * sizeof (long))
26     #else
27     # define STACKSIZE ( 512 * sizeof (long))
28     #endif
29    
30     enum {
31     REQ_QUIT,
32     REQ_OPEN, REQ_CLOSE,
33     REQ_READ, REQ_WRITE, REQ_READAHEAD,
34     REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
35     REQ_FSYNC, REQ_FDATASYNC,
36     };
37    
38     typedef struct aio_cb {
39 root 1.3 struct aio_cb *volatile next;
40 root 1.1
41     int type;
42    
43     int fd;
44     off_t offset;
45     size_t length;
46     ssize_t result;
47     mode_t mode; /* open */
48     int errorno;
49     SV *data, *callback;
50     void *dataptr;
51     STRLEN dataoffset;
52    
53     Stat_t *statdata;
54     } aio_cb;
55    
56     typedef aio_cb *aio_req;
57    
58     static int started;
59     static int nreqs;
60 root 1.3 static int respipe [2];
61 root 1.1
62 root 1.3 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
63     static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
64     static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
65    
66     static volatile aio_req reqs, reqe; /* queue start, queue end */
67     static volatile aio_req ress, rese; /* queue start, queue end */
68 root 1.1
69     static void *aio_proc(void *arg);
70    
71     static void
72     start_thread (void)
73     {
74     sigset_t fullsigset, oldsigset;
75     pthread_t tid;
76     pthread_attr_t attr;
77    
78     pthread_attr_init (&attr);
79     pthread_attr_setstacksize (&attr, STACKSIZE);
80     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
81    
82     sigfillset (&fullsigset);
83     sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
84    
85     if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
86     started++;
87    
88     sigprocmask (SIG_SETMASK, &oldsigset, 0);
89     }
90    
91     static void
92     send_req (aio_req req)
93     {
94     nreqs++;
95 root 1.3
96     pthread_mutex_lock (&reqlock);
97    
98 root 1.1 req->next = 0;
99    
100 root 1.3 if (reqe)
101 root 1.1 {
102 root 1.3 reqe->next = req;
103     reqe = req;
104 root 1.1 }
105     else
106 root 1.3 reqe = reqs = req;
107 root 1.1
108 root 1.3 pthread_cond_signal (&reqwait);
109     pthread_mutex_unlock (&reqlock);
110 root 1.1 }
111    
112     static void
113     end_thread (void)
114     {
115     aio_req req;
116     New (0, req, 1, aio_cb);
117     req->type = REQ_QUIT;
118    
119     send_req (req);
120     }
121    
122     static void
123     read_write (pTHX_
124     int dowrite, int fd, off_t offset, size_t length,
125     SV *data, STRLEN dataoffset, SV *callback)
126     {
127     aio_req req;
128     STRLEN svlen;
129     char *svptr = SvPV (data, svlen);
130    
131     SvUPGRADE (data, SVt_PV);
132     SvPOK_on (data);
133    
134     if (dataoffset < 0)
135     dataoffset += svlen;
136    
137     if (dataoffset < 0 || dataoffset > svlen)
138     croak ("data offset outside of string");
139    
140     if (dowrite)
141     {
142     /* write: check length and adjust. */
143     if (length < 0 || length + dataoffset > svlen)
144     length = svlen - dataoffset;
145     }
146     else
147     {
148     /* read: grow scalar as necessary */
149     svptr = SvGROW (data, length + dataoffset);
150     }
151    
152     if (length < 0)
153     croak ("length must not be negative");
154    
155     Newz (0, req, 1, aio_cb);
156    
157     if (!req)
158     croak ("out of memory during aio_req allocation");
159    
160     req->type = dowrite ? REQ_WRITE : REQ_READ;
161     req->fd = fd;
162     req->offset = offset;
163     req->length = length;
164     req->data = SvREFCNT_inc (data);
165     req->dataptr = (char *)svptr + dataoffset;
166     req->callback = SvREFCNT_inc (callback);
167    
168     send_req (req);
169     }
170    
171     static void
172     poll_wait ()
173     {
174 root 1.3 if (!nreqs)
175     return;
176    
177 root 1.1 fd_set rfd;
178     FD_ZERO(&rfd);
179 root 1.3 FD_SET(respipe [0], &rfd);
180 root 1.1
181 root 1.3 select (respipe [0] + 1, &rfd, 0, 0, 0);
182 root 1.1 }
183    
184     static int
185     poll_cb (pTHX)
186     {
187     dSP;
188     int count = 0;
189     aio_req req;
190 root 1.3
191     {
192     /* read and signals sent by the worker threads */
193     char buf [32];
194     while (read (respipe [0], buf, 32) > 0)
195     ;
196     }
197 root 1.1
198 root 1.3 for (;;)
199 root 1.1 {
200 root 1.3 pthread_mutex_lock (&reslock);
201    
202     req = ress;
203    
204     if (ress)
205     {
206     ress = ress->next;
207     if (!ress) rese = 0;
208     }
209    
210     pthread_mutex_unlock (&reslock);
211    
212     if (!req)
213     break;
214    
215 root 1.1 nreqs--;
216    
217     if (req->type == REQ_QUIT)
218     started--;
219     else
220     {
221     int errorno = errno;
222     errno = req->errorno;
223    
224     if (req->type == REQ_READ)
225     SvCUR_set (req->data, req->dataoffset
226     + req->result > 0 ? req->result : 0);
227    
228     if (req->data)
229     SvREFCNT_dec (req->data);
230    
231     if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
232     {
233     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
234     PL_laststatval = req->result;
235     PL_statcache = *(req->statdata);
236    
237     Safefree (req->statdata);
238     }
239    
240     PUSHMARK (SP);
241     XPUSHs (sv_2mortal (newSViv (req->result)));
242 root 1.2
243     if (req->type == REQ_OPEN)
244     {
245     /* convert fd to fh */
246     SV *fh;
247    
248     PUTBACK;
249     call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
250     SPAGAIN;
251    
252     fh = POPs;
253    
254     PUSHMARK (SP);
255     XPUSHs (fh);
256     }
257    
258 root 1.1 PUTBACK;
259 root 1.2 call_sv (req->callback, G_VOID | G_EVAL);
260 root 1.1 SPAGAIN;
261    
262     if (req->callback)
263     SvREFCNT_dec (req->callback);
264    
265     errno = errorno;
266     count++;
267     }
268    
269     Safefree (req);
270     }
271    
272     return count;
273     }
274    
275     static void *
276     aio_proc (void *thr_arg)
277     {
278     aio_req req;
279 root 1.3 int type;
280 root 1.1
281 root 1.3 do
282 root 1.1 {
283 root 1.3 pthread_mutex_lock (&reqlock);
284    
285     for (;;)
286     {
287     req = reqs;
288    
289     if (reqs)
290     {
291     reqs = reqs->next;
292     if (!reqs) reqe = 0;
293     }
294    
295     if (req)
296     break;
297    
298     pthread_cond_wait (&reqwait, &reqlock);
299     }
300    
301     pthread_mutex_unlock (&reqlock);
302    
303 root 1.1 errno = 0; /* strictly unnecessary */
304    
305 root 1.3 type = req->type;
306    
307     switch (type)
308 root 1.1 {
309     case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break;
310     case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break;
311     #if SYS_readahead
312     case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
313     #else
314     case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
315     #endif
316    
317     case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
318     case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
319     case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
320    
321     case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
322     case REQ_CLOSE: req->result = close (req->fd); break;
323     case REQ_UNLINK: req->result = unlink (req->dataptr); break;
324    
325     case REQ_FSYNC: req->result = fsync (req->fd); break;
326     case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
327    
328     case REQ_QUIT:
329 root 1.3 break;
330 root 1.1
331     default:
332     req->result = ENOSYS;
333     break;
334     }
335    
336     req->errorno = errno;
337 root 1.3
338     pthread_mutex_lock (&reslock);
339    
340     req->next = 0;
341    
342     if (rese)
343     {
344     rese->next = req;
345     rese = req;
346     }
347     else
348     {
349     rese = ress = req;
350    
351     /* write a dummy byte to the pipe so fh becomes ready */
352     write (respipe [1], &respipe, 1);
353     }
354    
355     pthread_mutex_unlock (&reslock);
356 root 1.1 }
357 root 1.3 while (type != REQ_QUIT);
358 root 1.1
359     return 0;
360     }
361    
362     MODULE = IO::AIO PACKAGE = IO::AIO
363    
364     BOOT:
365     {
366 root 1.3 if (pipe (respipe))
367     croak ("unable to initialize result pipe");
368 root 1.1
369 root 1.3 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
370 root 1.1 croak ("cannot set result pipe to nonblocking mode");
371    
372 root 1.3 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
373 root 1.1 croak ("cannot set result pipe to nonblocking mode");
374     }
375    
376     void
377     min_parallel(nthreads)
378     int nthreads
379     PROTOTYPE: $
380     CODE:
381     while (nthreads > started)
382     start_thread ();
383    
384     void
385     max_parallel(nthreads)
386     int nthreads
387     PROTOTYPE: $
388     CODE:
389     {
390     int cur = started;
391     while (cur > nthreads)
392     {
393     end_thread ();
394     cur--;
395     }
396    
397     while (started > nthreads)
398     {
399     poll_wait ();
400     poll_cb (aTHX);
401     }
402     }
403    
404     void
405     aio_open(pathname,flags,mode,callback)
406     SV * pathname
407     int flags
408     int mode
409     SV * callback
410     PROTOTYPE: $$$$
411     CODE:
412     {
413     aio_req req;
414    
415     Newz (0, req, 1, aio_cb);
416    
417     if (!req)
418     croak ("out of memory during aio_req allocation");
419    
420     req->type = REQ_OPEN;
421     req->data = newSVsv (pathname);
422     req->dataptr = SvPV_nolen (req->data);
423     req->fd = flags;
424     req->mode = mode;
425     req->callback = SvREFCNT_inc (callback);
426    
427     send_req (req);
428     }
429    
430     void
431     aio_close(fh,callback)
432     InputStream fh
433     SV * callback
434     PROTOTYPE: $$
435     ALIAS:
436     aio_close = REQ_CLOSE
437     aio_fsync = REQ_FSYNC
438     aio_fdatasync = REQ_FDATASYNC
439     CODE:
440     {
441     aio_req req;
442    
443     Newz (0, req, 1, aio_cb);
444    
445     if (!req)
446     croak ("out of memory during aio_req allocation");
447    
448     req->type = ix;
449     req->fd = PerlIO_fileno (fh);
450     req->callback = SvREFCNT_inc (callback);
451    
452     send_req (req);
453     }
454    
455     void
456     aio_read(fh,offset,length,data,dataoffset,callback)
457     InputStream fh
458     UV offset
459     IV length
460     SV * data
461     IV dataoffset
462     SV * callback
463     PROTOTYPE: $$$$$$
464     CODE:
465     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
466    
467     void
468     aio_write(fh,offset,length,data,dataoffset,callback)
469     OutputStream fh
470     UV offset
471     IV length
472     SV * data
473     IV dataoffset
474     SV * callback
475     PROTOTYPE: $$$$$$
476     CODE:
477     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
478    
479     void
480     aio_readahead(fh,offset,length,callback)
481     InputStream fh
482     UV offset
483     IV length
484     SV * callback
485     PROTOTYPE: $$$$
486     CODE:
487     {
488     aio_req req;
489    
490     if (length < 0)
491     croak ("length must not be negative");
492    
493     Newz (0, req, 1, aio_cb);
494    
495     if (!req)
496     croak ("out of memory during aio_req allocation");
497    
498     req->type = REQ_READAHEAD;
499     req->fd = PerlIO_fileno (fh);
500     req->offset = offset;
501     req->length = length;
502     req->callback = SvREFCNT_inc (callback);
503    
504     send_req (req);
505     }
506    
507     void
508     aio_stat(fh_or_path,callback)
509     SV * fh_or_path
510     SV * callback
511     PROTOTYPE: $$
512     ALIAS:
513     aio_lstat = 1
514     CODE:
515     {
516     aio_req req;
517    
518     Newz (0, req, 1, aio_cb);
519    
520     if (!req)
521     croak ("out of memory during aio_req allocation");
522    
523     New (0, req->statdata, 1, Stat_t);
524    
525     if (!req->statdata)
526     croak ("out of memory during aio_req->statdata allocation");
527    
528     if (SvPOK (fh_or_path))
529     {
530     req->type = ix ? REQ_LSTAT : REQ_STAT;
531     req->data = newSVsv (fh_or_path);
532     req->dataptr = SvPV_nolen (req->data);
533     }
534     else
535     {
536     req->type = REQ_FSTAT;
537     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
538     }
539    
540     req->callback = SvREFCNT_inc (callback);
541    
542     send_req (req);
543     }
544    
545     void
546     aio_unlink(pathname,callback)
547     SV * pathname
548     SV * callback
549     PROTOTYPE: $$
550     CODE:
551     {
552     aio_req req;
553    
554     Newz (0, req, 1, aio_cb);
555    
556     if (!req)
557     croak ("out of memory during aio_req allocation");
558    
559     req->type = REQ_UNLINK;
560     req->data = newSVsv (pathname);
561     req->dataptr = SvPV_nolen (req->data);
562     req->callback = SvREFCNT_inc (callback);
563    
564     send_req (req);
565     }
566    
567     int
568     poll_fileno()
569     PROTOTYPE:
570     CODE:
571 root 1.3 RETVAL = respipe [0];
572 root 1.1 OUTPUT:
573     RETVAL
574    
575     int
576     poll_cb(...)
577     PROTOTYPE:
578     CODE:
579     RETVAL = poll_cb (aTHX);
580     OUTPUT:
581     RETVAL
582    
583     void
584     poll_wait()
585     PROTOTYPE:
586     CODE:
587 root 1.3 if (nreqs)
588     poll_wait ();
589 root 1.1
590     int
591     nreqs()
592     PROTOTYPE:
593     CODE:
594     RETVAL = nreqs;
595     OUTPUT:
596     RETVAL
597