ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.8
Committed: Thu Aug 16 02:43:46 2001 UTC (22 years, 9 months ago) by root
Branch: MAIN
Changes since 1.7: +48 -17 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <unistd.h>
7 root 1.5 #include <fcntl.h>
8 root 1.8 #include <signal.h>
9 root 1.1 #include <sched.h>
10    
11 root 1.8 #define STACKSIZE 1024 /* yeah */
12 root 1.1
13 root 1.8 enum { REQ_QUIT, REQ_READ, REQ_WRITE, REQ_OPEN };
14 root 1.1
15     typedef struct {
16     char stack[STACKSIZE];
17     } aio_thread;
18    
19     typedef struct {
20     int type;
21     aio_thread *thread;
22    
23     /* read/write */
24     int fd;
25     off_t offset;
26     size_t length;
27 root 1.2 ssize_t result;
28 root 1.8 mode_t mode; /* open */
29 root 1.1 int errorno;
30 root 1.5 SV *data, *callback;
31 root 1.1 void *dataptr;
32 root 1.2 STRLEN dataoffset;
33 root 1.1 } aio_cb;
34    
35     typedef aio_cb *aio_req;
36    
37     static int started;
38 root 1.5 static int nreqs;
39 root 1.1 static int reqpipe[2], respipe[2];
40    
41 root 1.2 static int aio_proc(void *arg);
42    
43 root 1.1 static void
44     start_thread(void)
45     {
46 root 1.2 aio_thread *thr;
47    
48     New (0, thr, 1, aio_thread);
49    
50     if (clone (aio_proc,
51     &(thr->stack[STACKSIZE]),
52     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
53     thr) >= 0)
54     started++;
55     else
56     Safefree (thr);
57 root 1.1 }
58    
59     static void
60     end_thread(void)
61     {
62 root 1.5 aio_req req;
63     New (0, req, 1, aio_cb);
64     req->type = REQ_QUIT;
65 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
66 root 1.2 }
67    
68     static void
69 root 1.8 send_req (aio_req req)
70     {
71     nreqs++;
72     write (reqpipe[1], &req, sizeof (aio_req));
73     }
74    
75     static void
76 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
77     SV *data, STRLEN dataoffset, SV*callback)
78 root 1.2 {
79 root 1.5 aio_req req;
80     STRLEN svlen;
81     char *svptr = SvPV (data, svlen);
82    
83     if (dataoffset < 0)
84     dataoffset += svlen;
85    
86     if (dataoffset < 0 || dataoffset > svlen)
87     croak ("data offset outside of string");
88    
89     if (dowrite)
90     {
91     /* write: check length and adjust. */
92     if (length < 0 || length + dataoffset > svlen)
93     length = svlen - dataoffset;
94     }
95     else
96     {
97     /* read: grow scalar as necessary */
98     svptr = SvGROW (data, length + dataoffset);
99     }
100    
101     if (length < 0)
102     croak ("length must not be negative");
103    
104     New (0, req, 1, aio_cb);
105    
106     if (!req)
107     croak ("out of memory during aio_req allocation");
108    
109     req->type = dowrite ? REQ_WRITE : REQ_READ;
110     req->fd = fd;
111     req->offset = offset;
112     req->length = length;
113     req->data = SvREFCNT_inc (data);
114 root 1.6 req->dataptr = (char *)svptr + dataoffset;
115 root 1.5 req->callback = SvREFCNT_inc (callback);
116    
117 root 1.8 send_req (req);
118 root 1.5 }
119    
120     static int
121     poll_cb (pTHX)
122     {
123     dSP;
124     int count = 0;
125     aio_req req;
126    
127     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
128     {
129     if (req->type == REQ_QUIT)
130     {
131     Safefree (req->thread);
132     started--;
133     }
134     else
135     {
136     int errorno = errno;
137     errno = req->errorno;
138    
139     if (req->type == REQ_READ)
140 root 1.6 SvCUR_set (req->data, req->dataoffset
141     + req->result > 0 ? req->result : 0);
142 root 1.5
143     PUSHMARK (SP);
144     XPUSHs (sv_2mortal (newSViv (req->result)));
145     PUTBACK;
146     call_sv (req->callback, G_VOID);
147     SPAGAIN;
148    
149     SvREFCNT_dec (req->data);
150     SvREFCNT_dec (req->callback);
151    
152     errno = errorno;
153     nreqs--;
154     count++;
155     }
156    
157     Safefree (req);
158     }
159    
160     return count;
161 root 1.2 }
162    
163 root 1.8 static sigset_t fullsigset;
164    
165 root 1.2 #undef errno
166     #include <asm/unistd.h>
167    
168     static int
169     aio_proc(void *thr_arg)
170     {
171     aio_thread *thr = thr_arg;
172     int sig;
173     int errno;
174     aio_req req;
175    
176 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
177    
178 root 1.2 /* we rely on gcc's ability to create closures. */
179     _syscall3(int,lseek,int,fd,off_t,offset,int,whence);
180     _syscall3(int,read,int,fd,char *,buf,off_t,count);
181     _syscall3(int,write,int,fd,char *,buf,off_t,count);
182 root 1.8 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode);
183 root 1.2
184     /* then loop */
185     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
186     {
187     req->thread = thr;
188 root 1.8 errno = 0;
189 root 1.2
190     if (req->type == REQ_READ || req->type == REQ_WRITE)
191     {
192     if (lseek (req->fd, req->offset, SEEK_SET) == req->offset)
193     {
194     if (req->type == REQ_READ)
195     req->result = read (req->fd, req->dataptr, req->length);
196     else
197     req->result = write(req->fd, req->dataptr, req->length);
198     }
199 root 1.8 }
200     else if (req->type == REQ_OPEN)
201     {
202     req->result = open (req->dataptr, req->fd, req->mode);
203 root 1.2 }
204     else
205     {
206     write (respipe[1], (void *)&req, sizeof (req));
207     break;
208     }
209    
210 root 1.8 req->errorno = errno;
211 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
212     }
213    
214     return 0;
215 root 1.1 }
216    
217     MODULE = Linux::AIO PACKAGE = Linux::AIO
218    
219     BOOT:
220     {
221 root 1.8 sigfillset (&fullsigset);
222    
223 root 1.1 if (pipe (reqpipe) || pipe (respipe))
224     croak ("unable to initialize request or result pipe");
225 root 1.5
226     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
227     croak ("cannot set result pipe to nonblocking mode");
228 root 1.1 }
229    
230     void
231     min_parallel(nthreads)
232     int nthreads
233 root 1.5 PROTOTYPE: $
234 root 1.1 CODE:
235     while (nthreads > started)
236     start_thread ();
237    
238     void
239     max_parallel(nthreads)
240     int nthreads
241 root 1.5 PROTOTYPE: $
242 root 1.1 CODE:
243 root 1.5 int cur = started;
244     while (cur > nthreads)
245     {
246     end_thread ();
247     cur--;
248     }
249    
250     poll_cb ();
251 root 1.1 while (started > nthreads)
252 root 1.5 {
253     sched_yield ();
254     poll_cb ();
255     }
256 root 1.1
257     void
258 root 1.5 aio_read(fh,offset,length,data,dataoffset,callback)
259     PerlIO * fh
260 root 1.6 UV offset
261     STRLEN length
262 root 1.5 SV * data
263     STRLEN dataoffset
264     SV * callback
265     PROTOTYPE: $$$$$$
266 root 1.3 ALIAS:
267 root 1.5 aio_write = 1
268 root 1.1 CODE:
269 root 1.6 SvUPGRADE (data, SVt_PV);
270     SvPOK_on (data);
271 root 1.5 read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
272 root 1.8
273     void
274     aio_open(pathname,flags,mode,callback)
275     char * pathname
276     int flags
277     int mode
278     SV * callback
279     PROTOTYPE: $$$$
280     CODE:
281     aio_req req;
282    
283     New (0, req, 1, aio_cb);
284    
285     if (!req)
286     croak ("out of memory during aio_req allocation");
287    
288     req->type = REQ_OPEN;
289     req->dataptr = pathname;
290     req->fd = flags;
291     req->mode = mode;
292     req->callback = SvREFCNT_inc (callback);
293    
294     send_req (req);
295 root 1.5
296     int
297     poll_fileno()
298     PROTOTYPE:
299     CODE:
300     RETVAL = respipe[0];
301     OUTPUT:
302     RETVAL
303    
304     int
305 root 1.6 poll_cb(...)
306 root 1.5 PROTOTYPE:
307     CODE:
308     RETVAL = poll_cb (aTHX);
309     OUTPUT:
310     RETVAL
311    
312     int
313     nreqs()
314     PROTOTYPE:
315     CODE:
316     RETVAL = nreqs;
317     OUTPUT:
318     RETVAL
319