ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.9
Committed: Thu Aug 16 18:58:34 2001 UTC (22 years, 9 months ago) by root
Branch: MAIN
Changes since 1.8: +5 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <unistd.h>
7 root 1.5 #include <fcntl.h>
8 root 1.8 #include <signal.h>
9 root 1.1 #include <sched.h>
10    
11 root 1.8 #define STACKSIZE 1024 /* yeah */
12 root 1.1
13 root 1.8 enum { REQ_QUIT, REQ_READ, REQ_WRITE, REQ_OPEN };
14 root 1.1
15     typedef struct {
16     char stack[STACKSIZE];
17     } aio_thread;
18    
19     typedef struct {
20     int type;
21     aio_thread *thread;
22    
23     /* read/write */
24     int fd;
25     off_t offset;
26     size_t length;
27 root 1.2 ssize_t result;
28 root 1.8 mode_t mode; /* open */
29 root 1.1 int errorno;
30 root 1.5 SV *data, *callback;
31 root 1.1 void *dataptr;
32 root 1.2 STRLEN dataoffset;
33 root 1.1 } aio_cb;
34    
35     typedef aio_cb *aio_req;
36    
37     static int started;
38 root 1.5 static int nreqs;
39 root 1.1 static int reqpipe[2], respipe[2];
40    
41 root 1.2 static int aio_proc(void *arg);
42    
43 root 1.1 static void
44     start_thread(void)
45     {
46 root 1.2 aio_thread *thr;
47    
48     New (0, thr, 1, aio_thread);
49    
50     if (clone (aio_proc,
51     &(thr->stack[STACKSIZE]),
52     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
53     thr) >= 0)
54     started++;
55     else
56     Safefree (thr);
57 root 1.1 }
58    
59     static void
60     end_thread(void)
61     {
62 root 1.5 aio_req req;
63     New (0, req, 1, aio_cb);
64     req->type = REQ_QUIT;
65 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
66 root 1.2 }
67    
68     static void
69 root 1.8 send_req (aio_req req)
70     {
71     nreqs++;
72     write (reqpipe[1], &req, sizeof (aio_req));
73     }
74    
75     static void
76 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
77     SV *data, STRLEN dataoffset, SV*callback)
78 root 1.2 {
79 root 1.5 aio_req req;
80     STRLEN svlen;
81     char *svptr = SvPV (data, svlen);
82    
83     if (dataoffset < 0)
84     dataoffset += svlen;
85    
86     if (dataoffset < 0 || dataoffset > svlen)
87     croak ("data offset outside of string");
88    
89     if (dowrite)
90     {
91     /* write: check length and adjust. */
92     if (length < 0 || length + dataoffset > svlen)
93     length = svlen - dataoffset;
94     }
95     else
96     {
97     /* read: grow scalar as necessary */
98     svptr = SvGROW (data, length + dataoffset);
99     }
100    
101     if (length < 0)
102     croak ("length must not be negative");
103    
104     New (0, req, 1, aio_cb);
105    
106     if (!req)
107     croak ("out of memory during aio_req allocation");
108    
109     req->type = dowrite ? REQ_WRITE : REQ_READ;
110     req->fd = fd;
111     req->offset = offset;
112     req->length = length;
113     req->data = SvREFCNT_inc (data);
114 root 1.6 req->dataptr = (char *)svptr + dataoffset;
115 root 1.5 req->callback = SvREFCNT_inc (callback);
116    
117 root 1.8 send_req (req);
118 root 1.5 }
119    
120     static int
121     poll_cb (pTHX)
122     {
123     dSP;
124     int count = 0;
125     aio_req req;
126    
127     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
128     {
129     if (req->type == REQ_QUIT)
130     {
131     Safefree (req->thread);
132     started--;
133     }
134     else
135     {
136     int errorno = errno;
137     errno = req->errorno;
138    
139     if (req->type == REQ_READ)
140 root 1.6 SvCUR_set (req->data, req->dataoffset
141     + req->result > 0 ? req->result : 0);
142 root 1.5
143     PUSHMARK (SP);
144     XPUSHs (sv_2mortal (newSViv (req->result)));
145     PUTBACK;
146     call_sv (req->callback, G_VOID);
147     SPAGAIN;
148    
149     SvREFCNT_dec (req->data);
150     SvREFCNT_dec (req->callback);
151    
152     errno = errorno;
153     nreqs--;
154     count++;
155     }
156    
157     Safefree (req);
158     }
159    
160     return count;
161 root 1.2 }
162    
163 root 1.8 static sigset_t fullsigset;
164    
165 root 1.2 #undef errno
166     #include <asm/unistd.h>
167    
168     static int
169     aio_proc(void *thr_arg)
170     {
171     aio_thread *thr = thr_arg;
172 root 1.9 aio_req req;
173 root 1.2 int errno;
174    
175 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
176    
177 root 1.2 /* we rely on gcc's ability to create closures. */
178     _syscall3(int,lseek,int,fd,off_t,offset,int,whence);
179     _syscall3(int,read,int,fd,char *,buf,off_t,count);
180     _syscall3(int,write,int,fd,char *,buf,off_t,count);
181 root 1.8 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode);
182 root 1.2
183     /* then loop */
184     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
185     {
186     req->thread = thr;
187 root 1.8 errno = 0;
188 root 1.2
189     if (req->type == REQ_READ || req->type == REQ_WRITE)
190     {
191     if (lseek (req->fd, req->offset, SEEK_SET) == req->offset)
192     {
193     if (req->type == REQ_READ)
194     req->result = read (req->fd, req->dataptr, req->length);
195     else
196     req->result = write(req->fd, req->dataptr, req->length);
197     }
198 root 1.8 }
199     else if (req->type == REQ_OPEN)
200     {
201     req->result = open (req->dataptr, req->fd, req->mode);
202 root 1.2 }
203     else
204     {
205     write (respipe[1], (void *)&req, sizeof (req));
206     break;
207     }
208    
209 root 1.8 req->errorno = errno;
210 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
211     }
212    
213     return 0;
214 root 1.1 }
215    
216     MODULE = Linux::AIO PACKAGE = Linux::AIO
217    
218     BOOT:
219     {
220 root 1.8 sigfillset (&fullsigset);
221 root 1.9 sigdelset (&fullsigset, SIGTERM);
222     sigdelset (&fullsigset, SIGQUIT);
223     sigdelset (&fullsigset, SIGABRT);
224     sigdelset (&fullsigset, SIGINT);
225 root 1.8
226 root 1.1 if (pipe (reqpipe) || pipe (respipe))
227     croak ("unable to initialize request or result pipe");
228 root 1.5
229     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
230     croak ("cannot set result pipe to nonblocking mode");
231 root 1.1 }
232    
233     void
234     min_parallel(nthreads)
235     int nthreads
236 root 1.5 PROTOTYPE: $
237 root 1.1 CODE:
238     while (nthreads > started)
239     start_thread ();
240    
241     void
242     max_parallel(nthreads)
243     int nthreads
244 root 1.5 PROTOTYPE: $
245 root 1.1 CODE:
246 root 1.5 int cur = started;
247     while (cur > nthreads)
248     {
249     end_thread ();
250     cur--;
251     }
252    
253     poll_cb ();
254 root 1.1 while (started > nthreads)
255 root 1.5 {
256     sched_yield ();
257     poll_cb ();
258     }
259 root 1.1
260     void
261 root 1.5 aio_read(fh,offset,length,data,dataoffset,callback)
262     PerlIO * fh
263 root 1.6 UV offset
264     STRLEN length
265 root 1.5 SV * data
266     STRLEN dataoffset
267     SV * callback
268     PROTOTYPE: $$$$$$
269 root 1.3 ALIAS:
270 root 1.5 aio_write = 1
271 root 1.1 CODE:
272 root 1.6 SvUPGRADE (data, SVt_PV);
273     SvPOK_on (data);
274 root 1.5 read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
275 root 1.8
276     void
277     aio_open(pathname,flags,mode,callback)
278     char * pathname
279     int flags
280     int mode
281     SV * callback
282     PROTOTYPE: $$$$
283     CODE:
284     aio_req req;
285    
286     New (0, req, 1, aio_cb);
287    
288     if (!req)
289     croak ("out of memory during aio_req allocation");
290    
291     req->type = REQ_OPEN;
292     req->dataptr = pathname;
293     req->fd = flags;
294     req->mode = mode;
295     req->callback = SvREFCNT_inc (callback);
296    
297     send_req (req);
298 root 1.5
299     int
300     poll_fileno()
301     PROTOTYPE:
302     CODE:
303     RETVAL = respipe[0];
304     OUTPUT:
305     RETVAL
306    
307     int
308 root 1.6 poll_cb(...)
309 root 1.5 PROTOTYPE:
310     CODE:
311     RETVAL = poll_cb (aTHX);
312     OUTPUT:
313     RETVAL
314    
315     int
316     nreqs()
317     PROTOTYPE:
318     CODE:
319     RETVAL = nreqs;
320     OUTPUT:
321     RETVAL
322