ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.12
Committed: Tue Dec 25 02:33:48 2001 UTC (22 years, 4 months ago) by root
Branch: MAIN
Changes since 1.11: +8 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <unistd.h>
7 root 1.5 #include <fcntl.h>
8 root 1.8 #include <signal.h>
9 root 1.1 #include <sched.h>
10    
11 root 1.12 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
12    
13 root 1.8 #define STACKSIZE 1024 /* yeah */
14 root 1.1
15 root 1.10 enum { REQ_QUIT, REQ_READ, REQ_WRITE, REQ_OPEN, REQ_CLOSE };
16 root 1.1
17     typedef struct {
18     char stack[STACKSIZE];
19     } aio_thread;
20    
21     typedef struct {
22     int type;
23     aio_thread *thread;
24    
25     /* read/write */
26     int fd;
27     off_t offset;
28     size_t length;
29 root 1.2 ssize_t result;
30 root 1.8 mode_t mode; /* open */
31 root 1.1 int errorno;
32 root 1.5 SV *data, *callback;
33 root 1.1 void *dataptr;
34 root 1.2 STRLEN dataoffset;
35 root 1.1 } aio_cb;
36    
37     typedef aio_cb *aio_req;
38    
39     static int started;
40 root 1.5 static int nreqs;
41 root 1.1 static int reqpipe[2], respipe[2];
42    
43 root 1.2 static int aio_proc(void *arg);
44    
45 root 1.1 static void
46     start_thread(void)
47     {
48 root 1.2 aio_thread *thr;
49    
50     New (0, thr, 1, aio_thread);
51    
52     if (clone (aio_proc,
53     &(thr->stack[STACKSIZE]),
54     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
55     thr) >= 0)
56     started++;
57     else
58     Safefree (thr);
59 root 1.1 }
60    
61     static void
62     end_thread(void)
63     {
64 root 1.5 aio_req req;
65     New (0, req, 1, aio_cb);
66     req->type = REQ_QUIT;
67 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
68 root 1.2 }
69    
70     static void
71 root 1.8 send_req (aio_req req)
72     {
73     nreqs++;
74     write (reqpipe[1], &req, sizeof (aio_req));
75     }
76    
77     static void
78 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
79     SV *data, STRLEN dataoffset, SV*callback)
80 root 1.2 {
81 root 1.5 aio_req req;
82     STRLEN svlen;
83     char *svptr = SvPV (data, svlen);
84    
85     if (dataoffset < 0)
86     dataoffset += svlen;
87    
88     if (dataoffset < 0 || dataoffset > svlen)
89     croak ("data offset outside of string");
90    
91     if (dowrite)
92     {
93     /* write: check length and adjust. */
94     if (length < 0 || length + dataoffset > svlen)
95     length = svlen - dataoffset;
96     }
97     else
98     {
99     /* read: grow scalar as necessary */
100     svptr = SvGROW (data, length + dataoffset);
101     }
102    
103     if (length < 0)
104     croak ("length must not be negative");
105    
106     New (0, req, 1, aio_cb);
107    
108     if (!req)
109     croak ("out of memory during aio_req allocation");
110    
111     req->type = dowrite ? REQ_WRITE : REQ_READ;
112     req->fd = fd;
113     req->offset = offset;
114     req->length = length;
115     req->data = SvREFCNT_inc (data);
116 root 1.6 req->dataptr = (char *)svptr + dataoffset;
117 root 1.5 req->callback = SvREFCNT_inc (callback);
118    
119 root 1.8 send_req (req);
120 root 1.5 }
121    
122     static int
123     poll_cb (pTHX)
124     {
125     dSP;
126     int count = 0;
127     aio_req req;
128    
129     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
130     {
131     if (req->type == REQ_QUIT)
132     {
133     Safefree (req->thread);
134     started--;
135     }
136     else
137     {
138     int errorno = errno;
139     errno = req->errorno;
140    
141     if (req->type == REQ_READ)
142 root 1.6 SvCUR_set (req->data, req->dataoffset
143     + req->result > 0 ? req->result : 0);
144 root 1.5
145     PUSHMARK (SP);
146     XPUSHs (sv_2mortal (newSViv (req->result)));
147     PUTBACK;
148     call_sv (req->callback, G_VOID);
149     SPAGAIN;
150    
151     SvREFCNT_dec (req->data);
152     SvREFCNT_dec (req->callback);
153    
154     errno = errorno;
155     nreqs--;
156     count++;
157     }
158    
159     Safefree (req);
160     }
161    
162     return count;
163 root 1.2 }
164    
165 root 1.8 static sigset_t fullsigset;
166    
167 root 1.2 #undef errno
168     #include <asm/unistd.h>
169    
170     static int
171     aio_proc(void *thr_arg)
172     {
173     aio_thread *thr = thr_arg;
174 root 1.9 aio_req req;
175 root 1.2 int errno;
176    
177 root 1.11 /* we rely on gcc's ability to create closures. */
178     _syscall3(int,lseek,int,fd,off_t,offset,int,whence)
179     _syscall3(int,read,int,fd,char *,buf,off_t,count)
180     _syscall3(int,write,int,fd,char *,buf,off_t,count)
181     _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
182     _syscall1(int,close,int,fd)
183    
184 root 1.8 sigprocmask (SIG_SETMASK, &fullsigset, 0);
185 root 1.2
186     /* then loop */
187     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
188     {
189     req->thread = thr;
190 root 1.8 errno = 0;
191 root 1.2
192     if (req->type == REQ_READ || req->type == REQ_WRITE)
193     {
194     if (lseek (req->fd, req->offset, SEEK_SET) == req->offset)
195     {
196     if (req->type == REQ_READ)
197     req->result = read (req->fd, req->dataptr, req->length);
198     else
199     req->result = write(req->fd, req->dataptr, req->length);
200     }
201 root 1.8 }
202     else if (req->type == REQ_OPEN)
203     {
204     req->result = open (req->dataptr, req->fd, req->mode);
205 root 1.2 }
206 root 1.10 else if (req->type == REQ_CLOSE)
207     {
208     req->result = close (req->fd);
209     }
210 root 1.2 else
211     {
212     write (respipe[1], (void *)&req, sizeof (req));
213     break;
214     }
215    
216 root 1.8 req->errorno = errno;
217 root 1.2 write (respipe[1], (void *)&req, sizeof (req));
218     }
219    
220     return 0;
221 root 1.1 }
222    
223     MODULE = Linux::AIO PACKAGE = Linux::AIO
224    
225     BOOT:
226     {
227 root 1.8 sigfillset (&fullsigset);
228 root 1.9 sigdelset (&fullsigset, SIGTERM);
229     sigdelset (&fullsigset, SIGQUIT);
230     sigdelset (&fullsigset, SIGABRT);
231     sigdelset (&fullsigset, SIGINT);
232 root 1.8
233 root 1.1 if (pipe (reqpipe) || pipe (respipe))
234     croak ("unable to initialize request or result pipe");
235 root 1.5
236     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
237     croak ("cannot set result pipe to nonblocking mode");
238 root 1.1 }
239    
240     void
241     min_parallel(nthreads)
242     int nthreads
243 root 1.5 PROTOTYPE: $
244 root 1.1 CODE:
245     while (nthreads > started)
246     start_thread ();
247    
248     void
249     max_parallel(nthreads)
250     int nthreads
251 root 1.5 PROTOTYPE: $
252 root 1.1 CODE:
253 root 1.5 int cur = started;
254     while (cur > nthreads)
255     {
256     end_thread ();
257     cur--;
258     }
259    
260     poll_cb ();
261 root 1.1 while (started > nthreads)
262 root 1.5 {
263     sched_yield ();
264 root 1.12 fcntl (respipe[0], F_SETFL, 0);
265 root 1.5 poll_cb ();
266 root 1.12 fcntl (respipe[0], F_SETFL, O_NONBLOCK);
267 root 1.5 }
268 root 1.1
269     void
270 root 1.5 aio_read(fh,offset,length,data,dataoffset,callback)
271 root 1.12 InOutStream fh
272 root 1.6 UV offset
273 root 1.12 IV length
274 root 1.5 SV * data
275 root 1.12 IV dataoffset
276 root 1.5 SV * callback
277     PROTOTYPE: $$$$$$
278 root 1.3 ALIAS:
279 root 1.5 aio_write = 1
280 root 1.1 CODE:
281 root 1.6 SvUPGRADE (data, SVt_PV);
282     SvPOK_on (data);
283 root 1.5 read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
284 root 1.8
285     void
286     aio_open(pathname,flags,mode,callback)
287     char * pathname
288     int flags
289     int mode
290     SV * callback
291     PROTOTYPE: $$$$
292     CODE:
293     aio_req req;
294    
295     New (0, req, 1, aio_cb);
296    
297     if (!req)
298     croak ("out of memory during aio_req allocation");
299    
300     req->type = REQ_OPEN;
301     req->dataptr = pathname;
302     req->fd = flags;
303     req->mode = mode;
304 root 1.10 req->callback = SvREFCNT_inc (callback);
305    
306     send_req (req);
307    
308     void
309     aio_close(fh,callback)
310 root 1.12 InOutStream fh
311 root 1.10 SV * callback
312     PROTOTYPE: $
313     CODE:
314     aio_req req;
315    
316     New (0, req, 1, aio_cb);
317    
318     if (!req)
319     croak ("out of memory during aio_req allocation");
320    
321     req->type = REQ_CLOSE;
322     req->fd = PerlIO_fileno (fh);
323 root 1.8 req->callback = SvREFCNT_inc (callback);
324    
325     send_req (req);
326 root 1.5
327     int
328     poll_fileno()
329     PROTOTYPE:
330     CODE:
331     RETVAL = respipe[0];
332     OUTPUT:
333     RETVAL
334    
335     int
336 root 1.6 poll_cb(...)
337 root 1.5 PROTOTYPE:
338     CODE:
339     RETVAL = poll_cb (aTHX);
340     OUTPUT:
341     RETVAL
342    
343     int
344     nreqs()
345     PROTOTYPE:
346     CODE:
347     RETVAL = nreqs;
348     OUTPUT:
349     RETVAL
350