ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.11
Committed: Mon Oct 8 12:58:41 2001 UTC (22 years, 7 months ago) by root
Branch: MAIN
Changes since 1.10: +7 -7 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <unistd.h>
7 root 1.5 #include <fcntl.h>
8 root 1.8 #include <signal.h>
9 root 1.1 #include <sched.h>
10    
11 root 1.8 #define STACKSIZE 1024 /* yeah */
12 root 1.1
13 root 1.10 enum { REQ_QUIT, REQ_READ, REQ_WRITE, REQ_OPEN, REQ_CLOSE };
14 root 1.1
15     typedef struct {
16     char stack[STACKSIZE];
17     } aio_thread;
18    
19     typedef struct {
20     int type;
21     aio_thread *thread;
22    
23     /* read/write */
24     int fd;
25     off_t offset;
26     size_t length;
27 root 1.2 ssize_t result;
28 root 1.8 mode_t mode; /* open */
29 root 1.1 int errorno;
30 root 1.5 SV *data, *callback;
31 root 1.1 void *dataptr;
32 root 1.2 STRLEN dataoffset;
33 root 1.1 } aio_cb;
34    
35     typedef aio_cb *aio_req;
36    
37     static int started;
38 root 1.5 static int nreqs;
39 root 1.1 static int reqpipe[2], respipe[2];
40    
41 root 1.2 static int aio_proc(void *arg);
42    
43 root 1.1 static void
44     start_thread(void)
45     {
46 root 1.2 aio_thread *thr;
47    
48     New (0, thr, 1, aio_thread);
49    
50     if (clone (aio_proc,
51     &(thr->stack[STACKSIZE]),
52     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
53     thr) >= 0)
54     started++;
55     else
56     Safefree (thr);
57 root 1.1 }
58    
59     static void
60     end_thread(void)
61     {
62 root 1.5 aio_req req;
63     New (0, req, 1, aio_cb);
64     req->type = REQ_QUIT;
65 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
66 root 1.2 }
67    
68     static void
69 root 1.8 send_req (aio_req req)
70     {
71     nreqs++;
72     write (reqpipe[1], &req, sizeof (aio_req));
73     }
74    
75     static void
76 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
77     SV *data, STRLEN dataoffset, SV*callback)
78 root 1.2 {
79 root 1.5 aio_req req;
80     STRLEN svlen;
81     char *svptr = SvPV (data, svlen);
82    
83     if (dataoffset < 0)
84     dataoffset += svlen;
85    
86     if (dataoffset < 0 || dataoffset > svlen)
87     croak ("data offset outside of string");
88    
89     if (dowrite)
90     {
91     /* write: check length and adjust. */
92     if (length < 0 || length + dataoffset > svlen)
93     length = svlen - dataoffset;
94     }
95     else
96     {
97     /* read: grow scalar as necessary */
98     svptr = SvGROW (data, length + dataoffset);
99     }
100    
101     if (length < 0)
102     croak ("length must not be negative");
103    
104     New (0, req, 1, aio_cb);
105    
106     if (!req)
107     croak ("out of memory during aio_req allocation");
108    
109     req->type = dowrite ? REQ_WRITE : REQ_READ;
110     req->fd = fd;
111     req->offset = offset;
112     req->length = length;
113     req->data = SvREFCNT_inc (data);
114 root 1.6 req->dataptr = (char *)svptr + dataoffset;
115 root 1.5 req->callback = SvREFCNT_inc (callback);
116    
117 root 1.8 send_req (req);
118 root 1.5 }
119    
120     static int
121     poll_cb (pTHX)
122     {
123     dSP;
124     int count = 0;
125     aio_req req;
126    
127     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
128     {
129     if (req->type == REQ_QUIT)
130     {
131     Safefree (req->thread);
132     started--;
133     }
134     else
135     {
136     int errorno = errno;
137     errno = req->errorno;
138    
139     if (req->type == REQ_READ)
140 root 1.6 SvCUR_set (req->data, req->dataoffset
141     + req->result > 0 ? req->result : 0);
142 root 1.5
143     PUSHMARK (SP);
144     XPUSHs (sv_2mortal (newSViv (req->result)));
145     PUTBACK;
146     call_sv (req->callback, G_VOID);
147     SPAGAIN;
148    
149     SvREFCNT_dec (req->data);
150     SvREFCNT_dec (req->callback);
151    
152     errno = errorno;
153     nreqs--;
154     count++;
155     }
156    
157     Safefree (req);
158     }
159    
160     return count;
161 root 1.2 }
162    
163 root 1.8 static sigset_t fullsigset;
164    
165 root 1.2 #undef errno
166     #include <asm/unistd.h>
167    
168     static int
169     aio_proc(void *thr_arg)
170     {
171     aio_thread *thr = thr_arg;
172 root 1.9 aio_req req;
173 root 1.2 int errno;
174    
175 root 1.11 /* we rely on gcc's ability to create closures. */
176     _syscall3(int,lseek,int,fd,off_t,offset,int,whence)
177     _syscall3(int,read,int,fd,char *,buf,off_t,count)
178     _syscall3(int,write,int,fd,char *,buf,off_t,count)
179     _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
180     _syscall1(int,close,int,fd)
181    
182 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
183 root 1.2
184     /* then loop */
185     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
186     {
187     req->thread = thr;
188 root 1.8 errno = 0;
189 root 1.2
190     if (req->type == REQ_READ || req->type == REQ_WRITE)
191     {
192     if (lseek (req->fd, req->offset, SEEK_SET) == req->offset)
193     {
194     if (req->type == REQ_READ)
195     req->result = read (req->fd, req->dataptr, req->length);
196     else
197     req->result = write(req->fd, req->dataptr, req->length);
198     }
199 root 1.8 }
200     else if (req->type == REQ_OPEN)
201     {
202     req->result = open (req->dataptr, req->fd, req->mode);
203 root 1.2 }
204 root 1.10 else if (req->type == REQ_CLOSE)
205     {
206     req->result = close (req->fd);
207     }
208 root 1.2 else
209     {
210     write (respipe[1], (void *)&req, sizeof (req));
211     break;
212     }
213    
214 root 1.8 req->errorno = errno;
215 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
216     }
217    
218     return 0;
219 root 1.1 }
220    
221     MODULE = Linux::AIO PACKAGE = Linux::AIO
222    
223     BOOT:
224     {
225 root 1.8 sigfillset (&fullsigset);
226 root 1.9 sigdelset (&fullsigset, SIGTERM);
227     sigdelset (&fullsigset, SIGQUIT);
228     sigdelset (&fullsigset, SIGABRT);
229     sigdelset (&fullsigset, SIGINT);
230 root 1.8
231 root 1.1 if (pipe (reqpipe) || pipe (respipe))
232     croak ("unable to initialize request or result pipe");
233 root 1.5
234     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
235     croak ("cannot set result pipe to nonblocking mode");
236 root 1.1 }
237    
238     void
239     min_parallel(nthreads)
240     int nthreads
241 root 1.5 PROTOTYPE: $
242 root 1.1 CODE:
243     while (nthreads > started)
244     start_thread ();
245    
246     void
247     max_parallel(nthreads)
248     int nthreads
249 root 1.5 PROTOTYPE: $
250 root 1.1 CODE:
251 root 1.5 int cur = started;
252     while (cur > nthreads)
253     {
254     end_thread ();
255     cur--;
256     }
257    
258     poll_cb ();
259 root 1.1 while (started > nthreads)
260 root 1.5 {
261     sched_yield ();
262     poll_cb ();
263     }
264 root 1.1
265     void
266 root 1.5 aio_read(fh,offset,length,data,dataoffset,callback)
267     PerlIO * fh
268 root 1.6 UV offset
269     STRLEN length
270 root 1.5 SV * data
271     STRLEN dataoffset
272     SV * callback
273     PROTOTYPE: $$$$$$
274 root 1.3 ALIAS:
275 root 1.5 aio_write = 1
276 root 1.1 CODE:
277 root 1.6 SvUPGRADE (data, SVt_PV);
278     SvPOK_on (data);
279 root 1.5 read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
280 root 1.8
281     void
282     aio_open(pathname,flags,mode,callback)
283     char * pathname
284     int flags
285     int mode
286     SV * callback
287     PROTOTYPE: $$$$
288     CODE:
289     aio_req req;
290    
291     New (0, req, 1, aio_cb);
292    
293     if (!req)
294     croak ("out of memory during aio_req allocation");
295    
296     req->type = REQ_OPEN;
297     req->dataptr = pathname;
298     req->fd = flags;
299     req->mode = mode;
300 root 1.10 req->callback = SvREFCNT_inc (callback);
301    
302     send_req (req);
303    
304     void
305     aio_close(fh,callback)
306     PerlIO * fh
307     SV * callback
308     PROTOTYPE: $
309     CODE:
310     aio_req req;
311    
312     New (0, req, 1, aio_cb);
313    
314     if (!req)
315     croak ("out of memory during aio_req allocation");
316    
317     req->type = REQ_CLOSE;
318     req->fd = PerlIO_fileno (fh);
319 root 1.8 req->callback = SvREFCNT_inc (callback);
320    
321     send_req (req);
322 root 1.5
323     int
324     poll_fileno()
325     PROTOTYPE:
326     CODE:
327     RETVAL = respipe[0];
328     OUTPUT:
329     RETVAL
330    
331     int
332 root 1.6 poll_cb(...)
333 root 1.5 PROTOTYPE:
334     CODE:
335     RETVAL = poll_cb (aTHX);
336     OUTPUT:
337     RETVAL
338    
339     int
340     nreqs()
341     PROTOTYPE:
342     CODE:
343     RETVAL = nreqs;
344     OUTPUT:
345     RETVAL
346