ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.5
Committed: Tue Aug 14 18:06:37 2001 UTC (22 years, 9 months ago) by root
Branch: MAIN
Changes since 1.4: +148 -11 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <unistd.h>
7 root 1.5 #include <fcntl.h>
8 root 1.1 #include <sched.h>
9    
10 root 1.5 #define STACKSIZE 1024 /* yeah */
11 root 1.1
12 root 1.2 #define REQ_QUIT 0
13 root 1.1 #define REQ_READ 1
14     #define REQ_WRITE 2
15    
16     typedef struct {
17     char stack[STACKSIZE];
18     } aio_thread;
19    
20     typedef struct {
21     int type;
22     aio_thread *thread;
23    
24     /* read/write */
25     int fd;
26     off_t offset;
27     size_t length;
28 root 1.2 ssize_t result;
29 root 1.1 int errorno;
30    
31 root 1.5 SV *data, *callback;
32 root 1.1 void *dataptr;
33 root 1.2 STRLEN dataoffset;
34 root 1.1 } aio_cb;
35    
36     typedef aio_cb *aio_req;
37    
38     static int started;
39 root 1.5 static int nreqs;
40 root 1.1 static int reqpipe[2], respipe[2];
41    
42 root 1.2 static int aio_proc(void *arg);
43    
44 root 1.1 static void
45     start_thread(void)
46     {
47 root 1.2 aio_thread *thr;
48    
49     New (0, thr, 1, aio_thread);
50    
51     if (clone (aio_proc,
52     &(thr->stack[STACKSIZE]),
53     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
54     thr) >= 0)
55     started++;
56     else
57     Safefree (thr);
58 root 1.1 }
59    
60     static void
61     end_thread(void)
62     {
63 root 1.5 aio_req req;
64     New (0, req, 1, aio_cb);
65     req->type = REQ_QUIT;
66 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
67 root 1.2 }
68    
69     static void
70 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
71     SV *data, STRLEN dataoffset, SV*callback)
72 root 1.2 {
73 root 1.5 aio_req req;
74     STRLEN svlen;
75     char *svptr = SvPV (data, svlen);
76    
77     if (dataoffset < 0)
78     dataoffset += svlen;
79    
80     if (dataoffset < 0 || dataoffset > svlen)
81     croak ("data offset outside of string");
82    
83     if (dowrite)
84     {
85     /* write: check length and adjust. */
86     if (length < 0 || length + dataoffset > svlen)
87     length = svlen - dataoffset;
88     }
89     else
90     {
91     /* read: grow scalar as necessary */
92     svptr = SvGROW (data, length + dataoffset);
93     }
94    
95     if (length < 0)
96     croak ("length must not be negative");
97    
98     New (0, req, 1, aio_cb);
99    
100     if (!req)
101     croak ("out of memory during aio_req allocation");
102    
103     req->type = dowrite ? REQ_WRITE : REQ_READ;
104     req->fd = fd;
105     req->offset = offset;
106     req->length = length;
107     req->data = SvREFCNT_inc (data);
108     req->dataptr = svptr + dataoffset;
109     req->callback = SvREFCNT_inc (callback);
110    
111     nreqs++;
112     write (reqpipe[1], &req, sizeof (aio_req));
113     }
114    
115     static int
116     poll_cb (pTHX)
117     {
118     dSP;
119     int count = 0;
120     aio_req req;
121    
122     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
123     {
124     if (req->type == REQ_QUIT)
125     {
126     Safefree (req->thread);
127     started--;
128     }
129     else
130     {
131     int errorno = errno;
132     errno = req->errorno;
133    
134     if (req->type == REQ_READ)
135     SvCUR_set (req->data, req->result > 0
136     ? req->dataoffset + req->result
137     : req->dataoffset);
138    
139     PUSHMARK (SP);
140     XPUSHs (sv_2mortal (newSViv (req->result)));
141     PUTBACK;
142     call_sv (req->callback, G_VOID);
143     SPAGAIN;
144    
145     SvREFCNT_dec (req->data);
146     SvREFCNT_dec (req->callback);
147    
148     errno = errorno;
149     nreqs--;
150     count++;
151     }
152    
153     Safefree (req);
154     }
155    
156     return count;
157 root 1.2 }
158    
159     #undef errno
160     #include <asm/unistd.h>
161    
162     static int
163     aio_proc(void *thr_arg)
164     {
165     aio_thread *thr = thr_arg;
166     int sig;
167     int errno;
168     aio_req req;
169    
170     /* we rely on gcc's ability to create closures. */
171     _syscall3(int,lseek,int,fd,off_t,offset,int,whence);
172     _syscall3(int,read,int,fd,char *,buf,off_t,count);
173     _syscall3(int,write,int,fd,char *,buf,off_t,count);
174    
175     /* first get rid of any signals */
176     for (sig = 1; sig < _NSIG; sig++)
177 root 1.5 signal (sig, SIG_DFL);
178 root 1.4
179 root 1.5 signal (SIGPIPE, SIG_IGN);
180 root 1.2
181     /* then loop */
182     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
183     {
184     req->thread = thr;
185    
186     if (req->type == REQ_READ || req->type == REQ_WRITE)
187     {
188     errno = 0;
189    
190     if (lseek (req->fd, req->offset, SEEK_SET) == req->offset)
191     {
192     if (req->type == REQ_READ)
193     req->result = read (req->fd, req->dataptr, req->length);
194     else
195     req->result = write(req->fd, req->dataptr, req->length);
196     }
197    
198     req->errorno = errno;
199     }
200     else
201     {
202     write (respipe[1], (void *)&req, sizeof (req));
203     break;
204     }
205    
206     write (respipe[1], (void *)&req, sizeof (req));
207     }
208    
209     return 0;
210 root 1.1 }
211    
212     MODULE = Linux::AIO PACKAGE = Linux::AIO
213    
214     BOOT:
215     {
216     if (pipe (reqpipe) || pipe (respipe))
217     croak ("unable to initialize request or result pipe");
218 root 1.5
219     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
220     croak ("cannot set result pipe to nonblocking mode");
221 root 1.1 }
222    
223     void
224     min_parallel(nthreads)
225     int nthreads
226 root 1.5 PROTOTYPE: $
227 root 1.1 CODE:
228     while (nthreads > started)
229     start_thread ();
230    
231     void
232     max_parallel(nthreads)
233     int nthreads
234 root 1.5 PROTOTYPE: $
235 root 1.1 CODE:
236 root 1.5 int cur = started;
237     while (cur > nthreads)
238     {
239     end_thread ();
240     cur--;
241     }
242    
243     poll_cb ();
244 root 1.1 while (started > nthreads)
245 root 1.5 {
246     sched_yield ();
247     poll_cb ();
248     }
249 root 1.1
250     void
251 root 1.5 aio_read(fh,offset,length,data,dataoffset,callback)
252     PerlIO * fh
253     unsigned long offset
254     long length
255     SV * data
256     STRLEN dataoffset
257     SV * callback
258     PROTOTYPE: $$$$$$
259 root 1.3 ALIAS:
260 root 1.5 aio_write = 1
261 root 1.1 CODE:
262 root 1.5 sv_upgrade (data, SVt_PV);
263     read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
264    
265     int
266     poll_fileno()
267     PROTOTYPE:
268     CODE:
269     RETVAL = respipe[0];
270     OUTPUT:
271     RETVAL
272    
273     int
274     poll_cb()
275     PROTOTYPE:
276     CODE:
277     RETVAL = poll_cb (aTHX);
278     OUTPUT:
279     RETVAL
280    
281     int
282     nreqs()
283     PROTOTYPE:
284     CODE:
285     RETVAL = nreqs;
286     OUTPUT:
287     RETVAL
288