ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.14
Committed: Tue Apr 2 14:01:09 2002 UTC (22 years, 1 month ago) by root
Branch: MAIN
Changes since 1.13: +20 -21 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6 root 1.13 #include <sys/stat.h>
7 root 1.1 #include <unistd.h>
8 root 1.5 #include <fcntl.h>
9 root 1.8 #include <signal.h>
10 root 1.1 #include <sched.h>
11    
12 root 1.13 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
13     typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
14 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
15    
16 root 1.8 #define STACKSIZE 1024 /* yeah */
17 root 1.1
18 root 1.13 enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, REQ_STAT, REQ_LSTAT, REQ_FSTAT};
19 root 1.1
20     typedef struct {
21     char stack[STACKSIZE];
22     } aio_thread;
23    
24     typedef struct {
25     int type;
26     aio_thread *thread;
27    
28     int fd;
29     off_t offset;
30     size_t length;
31 root 1.2 ssize_t result;
32 root 1.8 mode_t mode; /* open */
33 root 1.1 int errorno;
34 root 1.5 SV *data, *callback;
35 root 1.1 void *dataptr;
36 root 1.2 STRLEN dataoffset;
37 root 1.13
38     struct stat64 *statdata;
39 root 1.1 } aio_cb;
40    
41     typedef aio_cb *aio_req;
42    
43     static int started;
44 root 1.5 static int nreqs;
45 root 1.1 static int reqpipe[2], respipe[2];
46    
47 root 1.2 static int aio_proc(void *arg);
48    
49 root 1.1 static void
50     start_thread(void)
51     {
52 root 1.2 aio_thread *thr;
53    
54     New (0, thr, 1, aio_thread);
55    
56     if (clone (aio_proc,
57     &(thr->stack[STACKSIZE]),
58     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
59     thr) >= 0)
60     started++;
61     else
62     Safefree (thr);
63 root 1.1 }
64    
65     static void
66     end_thread(void)
67     {
68 root 1.5 aio_req req;
69     New (0, req, 1, aio_cb);
70     req->type = REQ_QUIT;
71 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
72 root 1.2 }
73    
74     static void
75 root 1.8 send_req (aio_req req)
76     {
77     nreqs++;
78     write (reqpipe[1], &req, sizeof (aio_req));
79     }
80    
81     static void
82 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
83     SV *data, STRLEN dataoffset, SV*callback)
84 root 1.2 {
85 root 1.5 aio_req req;
86     STRLEN svlen;
87     char *svptr = SvPV (data, svlen);
88    
89 root 1.13 SvUPGRADE (data, SVt_PV);
90     SvPOK_on (data);
91    
92 root 1.5 if (dataoffset < 0)
93     dataoffset += svlen;
94    
95     if (dataoffset < 0 || dataoffset > svlen)
96     croak ("data offset outside of string");
97    
98     if (dowrite)
99     {
100     /* write: check length and adjust. */
101     if (length < 0 || length + dataoffset > svlen)
102     length = svlen - dataoffset;
103     }
104     else
105     {
106     /* read: grow scalar as necessary */
107     svptr = SvGROW (data, length + dataoffset);
108     }
109    
110     if (length < 0)
111     croak ("length must not be negative");
112    
113 root 1.14 Newz (0, req, 1, aio_cb);
114 root 1.5
115     if (!req)
116     croak ("out of memory during aio_req allocation");
117    
118     req->type = dowrite ? REQ_WRITE : REQ_READ;
119     req->fd = fd;
120     req->offset = offset;
121     req->length = length;
122     req->data = SvREFCNT_inc (data);
123 root 1.6 req->dataptr = (char *)svptr + dataoffset;
124 root 1.5 req->callback = SvREFCNT_inc (callback);
125    
126 root 1.8 send_req (req);
127 root 1.5 }
128    
129     static int
130     poll_cb (pTHX)
131     {
132     dSP;
133     int count = 0;
134     aio_req req;
135    
136     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
137     {
138     if (req->type == REQ_QUIT)
139     {
140     Safefree (req->thread);
141     started--;
142     }
143     else
144     {
145     int errorno = errno;
146     errno = req->errorno;
147    
148     if (req->type == REQ_READ)
149 root 1.6 SvCUR_set (req->data, req->dataoffset
150     + req->result > 0 ? req->result : 0);
151 root 1.5
152 root 1.14 if (req->data)
153     SvREFCNT_dec (req->data);
154    
155 root 1.13 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
156     {
157     PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
158     PL_laststatval = req->result;
159     PL_statcache.st_dev = req->statdata->st_dev;
160     PL_statcache.st_ino = req->statdata->st_ino;
161     PL_statcache.st_mode = req->statdata->st_mode;
162     PL_statcache.st_nlink = req->statdata->st_nlink;
163     PL_statcache.st_uid = req->statdata->st_uid;
164     PL_statcache.st_gid = req->statdata->st_gid;
165     PL_statcache.st_rdev = req->statdata->st_rdev;
166     PL_statcache.st_size = req->statdata->st_size;
167     PL_statcache.st_atime = req->statdata->st_atime;
168     PL_statcache.st_mtime = req->statdata->st_mtime;
169     PL_statcache.st_ctime = req->statdata->st_ctime;
170     PL_statcache.st_blksize = req->statdata->st_blksize;
171     PL_statcache.st_blocks = req->statdata->st_blocks;
172    
173     Safefree (req->statdata);
174     }
175    
176 root 1.5 PUSHMARK (SP);
177     XPUSHs (sv_2mortal (newSViv (req->result)));
178     PUTBACK;
179     call_sv (req->callback, G_VOID);
180     SPAGAIN;
181    
182 root 1.14 if (req->callback)
183     SvREFCNT_dec (req->callback);
184 root 1.5
185     errno = errorno;
186     nreqs--;
187     count++;
188     }
189    
190     Safefree (req);
191     }
192    
193     return count;
194 root 1.2 }
195    
196 root 1.8 static sigset_t fullsigset;
197    
198 root 1.2 #undef errno
199     #include <asm/unistd.h>
200    
201     static int
202     aio_proc(void *thr_arg)
203     {
204     aio_thread *thr = thr_arg;
205 root 1.9 aio_req req;
206 root 1.2 int errno;
207    
208 root 1.11 /* we rely on gcc's ability to create closures. */
209 root 1.13 _syscall3(int,read,int,fd,char *,buf,size_t,count)
210     _syscall3(int,write,int,fd,char *,buf,size_t,count)
211    
212 root 1.11 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
213     _syscall1(int,close,int,fd)
214    
215 root 1.13 _syscall4(int,pread,int,fd,char *,buf,size_t,count,off_t,offset)
216     _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,off_t,offset)
217    
218     _syscall2(int,stat64, const char *, filename, struct stat64 *, buf)
219     _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf)
220     _syscall2(int,fstat64, int, fd, struct stat64 *, buf)
221    
222 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
223 root 1.2
224     /* then loop */
225     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
226     {
227     req->thread = thr;
228 root 1.14 errno = 0; /* strictly unnecessary */
229 root 1.2
230 root 1.13 if (req->type == REQ_READ)
231     req->result = pread (req->fd, req->dataptr, req->length, req->offset);
232     else if (req->type == REQ_WRITE)
233     req->result = pwrite (req->fd, req->dataptr, req->length, req->offset);
234 root 1.8 else if (req->type == REQ_OPEN)
235 root 1.13 req->result = open (req->dataptr, req->fd, req->mode);
236 root 1.10 else if (req->type == REQ_CLOSE)
237 root 1.13 req->result = close (req->fd);
238     else if (req->type == REQ_STAT)
239     req->result = stat64 (req->dataptr, req->statdata);
240     else if (req->type == REQ_LSTAT)
241     req->result = lstat64 (req->dataptr, req->statdata);
242     else if (req->type == REQ_FSTAT)
243     req->result = fstat64 (req->fd, req->statdata);
244 root 1.2 else
245     {
246     write (respipe[1], (void *)&req, sizeof (req));
247     break;
248     }
249    
250 root 1.8 req->errorno = errno;
251 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
252     }
253    
254     return 0;
255 root 1.1 }
256    
257     MODULE = Linux::AIO PACKAGE = Linux::AIO
258    
259     BOOT:
260     {
261 root 1.8 sigfillset (&fullsigset);
262 root 1.9 sigdelset (&fullsigset, SIGTERM);
263     sigdelset (&fullsigset, SIGQUIT);
264     sigdelset (&fullsigset, SIGABRT);
265     sigdelset (&fullsigset, SIGINT);
266 root 1.8
267 root 1.1 if (pipe (reqpipe) || pipe (respipe))
268     croak ("unable to initialize request or result pipe");
269 root 1.5
270     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
271     croak ("cannot set result pipe to nonblocking mode");
272 root 1.1 }
273    
274     void
275     min_parallel(nthreads)
276     int nthreads
277 root 1.5 PROTOTYPE: $
278 root 1.1 CODE:
279     while (nthreads > started)
280     start_thread ();
281    
282     void
283     max_parallel(nthreads)
284     int nthreads
285 root 1.5 PROTOTYPE: $
286 root 1.1 CODE:
287 root 1.5 int cur = started;
288     while (cur > nthreads)
289     {
290     end_thread ();
291     cur--;
292     }
293    
294 root 1.1 while (started > nthreads)
295 root 1.5 {
296 root 1.14 fd_set rfd;
297     FD_ZERO(&rfd);
298     FD_SET(respipe[0], &rfd);
299    
300     select (respipe[0] + 1, &rfd, 0, 0, 0);
301 root 1.5 poll_cb ();
302     }
303 root 1.1
304     void
305 root 1.8 aio_open(pathname,flags,mode,callback)
306 root 1.13 SV * pathname
307 root 1.8 int flags
308     int mode
309     SV * callback
310     PROTOTYPE: $$$$
311     CODE:
312     aio_req req;
313    
314 root 1.14 Newz (0, req, 1, aio_cb);
315 root 1.8
316     if (!req)
317     croak ("out of memory during aio_req allocation");
318    
319     req->type = REQ_OPEN;
320 root 1.14 req->data = newSVsv (pathname);
321     req->dataptr = SvPV_nolen (req->data);
322 root 1.8 req->fd = flags;
323     req->mode = mode;
324 root 1.10 req->callback = SvREFCNT_inc (callback);
325    
326     send_req (req);
327    
328     void
329     aio_close(fh,callback)
330 root 1.13 InputStream fh
331 root 1.10 SV * callback
332 root 1.14 PROTOTYPE: $$
333 root 1.10 CODE:
334     aio_req req;
335    
336 root 1.14 Newz (0, req, 1, aio_cb);
337 root 1.10
338     if (!req)
339     croak ("out of memory during aio_req allocation");
340    
341     req->type = REQ_CLOSE;
342     req->fd = PerlIO_fileno (fh);
343 root 1.13 req->callback = SvREFCNT_inc (callback);
344    
345     send_req (req);
346    
347     void
348     aio_read(fh,offset,length,data,dataoffset,callback)
349     InputStream fh
350     UV offset
351     IV length
352     SV * data
353     IV dataoffset
354     SV * callback
355     PROTOTYPE: $$$$$$
356     CODE:
357     read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
358    
359     void
360     aio_write(fh,offset,length,data,dataoffset,callback)
361     OutputStream fh
362     UV offset
363     IV length
364     SV * data
365     IV dataoffset
366     SV * callback
367     PROTOTYPE: $$$$$$
368     CODE:
369     read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
370    
371     void
372     aio_stat(fh_or_path,callback)
373     SV * fh_or_path
374     SV * callback
375     PROTOTYPE: $$
376     ALIAS:
377     aio_lstat = 1
378     CODE:
379     aio_req req;
380    
381 root 1.14 Newz (0, req, 1, aio_cb);
382 root 1.13
383     if (!req)
384     croak ("out of memory during aio_req allocation");
385    
386     New (0, req->statdata, 1, struct stat64);
387    
388     if (!req->statdata)
389     croak ("out of memory during aio_req->statdata allocation");
390    
391     if (SvPOK (fh_or_path))
392     {
393     req->type = ix ? REQ_LSTAT : REQ_STAT;
394 root 1.14 req->data = newSVsv (fh_or_path);
395     req->dataptr = SvPV_nolen (req->data);
396 root 1.13 }
397     else
398     {
399     req->type = REQ_FSTAT;
400     req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
401     }
402    
403 root 1.8 req->callback = SvREFCNT_inc (callback);
404    
405     send_req (req);
406 root 1.5
407     int
408     poll_fileno()
409     PROTOTYPE:
410     CODE:
411     RETVAL = respipe[0];
412     OUTPUT:
413     RETVAL
414    
415     int
416 root 1.6 poll_cb(...)
417 root 1.5 PROTOTYPE:
418     CODE:
419     RETVAL = poll_cb (aTHX);
420     OUTPUT:
421     RETVAL
422    
423     int
424     nreqs()
425     PROTOTYPE:
426     CODE:
427     RETVAL = nreqs;
428     OUTPUT:
429     RETVAL
430