ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
Revision: 1.7
Committed: Wed Aug 15 03:24:08 2001 UTC (22 years, 9 months ago) by root
Branch: MAIN
Changes since 1.6: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/types.h>
6     #include <unistd.h>
7 root 1.5 #include <fcntl.h>
8 root 1.1 #include <sched.h>
9    
10 root 1.7 #define STACKSIZE 4096 /* yeah */
11 root 1.1
12 root 1.2 #define REQ_QUIT 0
13 root 1.1 #define REQ_READ 1
14     #define REQ_WRITE 2
15    
16     typedef struct {
17     char stack[STACKSIZE];
18     } aio_thread;
19    
20     typedef struct {
21     int type;
22     aio_thread *thread;
23    
24     /* read/write */
25     int fd;
26     off_t offset;
27     size_t length;
28 root 1.2 ssize_t result;
29 root 1.1 int errorno;
30    
31 root 1.5 SV *data, *callback;
32 root 1.1 void *dataptr;
33 root 1.2 STRLEN dataoffset;
34 root 1.1 } aio_cb;
35    
36     typedef aio_cb *aio_req;
37    
38     static int started;
39 root 1.5 static int nreqs;
40 root 1.1 static int reqpipe[2], respipe[2];
41    
42 root 1.2 static int aio_proc(void *arg);
43    
44 root 1.1 static void
45     start_thread(void)
46     {
47 root 1.2 aio_thread *thr;
48    
49     New (0, thr, 1, aio_thread);
50    
51     if (clone (aio_proc,
52     &(thr->stack[STACKSIZE]),
53     CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND,
54     thr) >= 0)
55     started++;
56     else
57     Safefree (thr);
58 root 1.1 }
59    
60     static void
61     end_thread(void)
62     {
63 root 1.5 aio_req req;
64     New (0, req, 1, aio_cb);
65     req->type = REQ_QUIT;
66 root 1.1 write (reqpipe[1], &req, sizeof (aio_req));
67 root 1.2 }
68    
69     static void
70 root 1.5 read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length,
71     SV *data, STRLEN dataoffset, SV*callback)
72 root 1.2 {
73 root 1.5 aio_req req;
74     STRLEN svlen;
75     char *svptr = SvPV (data, svlen);
76    
77     if (dataoffset < 0)
78     dataoffset += svlen;
79    
80     if (dataoffset < 0 || dataoffset > svlen)
81     croak ("data offset outside of string");
82    
83     if (dowrite)
84     {
85     /* write: check length and adjust. */
86     if (length < 0 || length + dataoffset > svlen)
87     length = svlen - dataoffset;
88     }
89     else
90     {
91     /* read: grow scalar as necessary */
92     svptr = SvGROW (data, length + dataoffset);
93     }
94    
95     if (length < 0)
96     croak ("length must not be negative");
97    
98     New (0, req, 1, aio_cb);
99    
100     if (!req)
101     croak ("out of memory during aio_req allocation");
102    
103     req->type = dowrite ? REQ_WRITE : REQ_READ;
104     req->fd = fd;
105     req->offset = offset;
106     req->length = length;
107     req->data = SvREFCNT_inc (data);
108 root 1.6 req->dataptr = (char *)svptr + dataoffset;
109 root 1.5 req->callback = SvREFCNT_inc (callback);
110    
111     nreqs++;
112     write (reqpipe[1], &req, sizeof (aio_req));
113     }
114    
115     static int
116     poll_cb (pTHX)
117     {
118     dSP;
119     int count = 0;
120     aio_req req;
121    
122     while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
123     {
124     if (req->type == REQ_QUIT)
125     {
126     Safefree (req->thread);
127     started--;
128     }
129     else
130     {
131     int errorno = errno;
132     errno = req->errorno;
133    
134     if (req->type == REQ_READ)
135 root 1.6 SvCUR_set (req->data, req->dataoffset
136     + req->result > 0 ? req->result : 0);
137 root 1.5
138     PUSHMARK (SP);
139     XPUSHs (sv_2mortal (newSViv (req->result)));
140     PUTBACK;
141     call_sv (req->callback, G_VOID);
142     SPAGAIN;
143    
144     SvREFCNT_dec (req->data);
145     SvREFCNT_dec (req->callback);
146    
147     errno = errorno;
148     nreqs--;
149     count++;
150     }
151    
152     Safefree (req);
153     }
154    
155     return count;
156 root 1.2 }
157    
158     #undef errno
159     #include <asm/unistd.h>
160    
161     static int
162     aio_proc(void *thr_arg)
163     {
164     aio_thread *thr = thr_arg;
165     int sig;
166     int errno;
167     aio_req req;
168    
169     /* we rely on gcc's ability to create closures. */
170     _syscall3(int,lseek,int,fd,off_t,offset,int,whence);
171     _syscall3(int,read,int,fd,char *,buf,off_t,count);
172     _syscall3(int,write,int,fd,char *,buf,off_t,count);
173    
174     /* first get rid of any signals */
175     for (sig = 1; sig < _NSIG; sig++)
176 root 1.5 signal (sig, SIG_DFL);
177 root 1.4
178 root 1.5 signal (SIGPIPE, SIG_IGN);
179 root 1.2
180     /* then loop */
181     while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
182     {
183     req->thread = thr;
184    
185     if (req->type == REQ_READ || req->type == REQ_WRITE)
186     {
187     errno = 0;
188    
189     if (lseek (req->fd, req->offset, SEEK_SET) == req->offset)
190     {
191     if (req->type == REQ_READ)
192     req->result = read (req->fd, req->dataptr, req->length);
193     else
194     req->result = write(req->fd, req->dataptr, req->length);
195     }
196    
197     req->errorno = errno;
198     }
199     else
200     {
201     write (respipe[1], (void *)&req, sizeof (req));
202     break;
203     }
204    
205     write (respipe[1], (void *)&req, sizeof (req));
206     }
207    
208     return 0;
209 root 1.1 }
210    
211     MODULE = Linux::AIO PACKAGE = Linux::AIO
212    
213     BOOT:
214     {
215     if (pipe (reqpipe) || pipe (respipe))
216     croak ("unable to initialize request or result pipe");
217 root 1.5
218     if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
219     croak ("cannot set result pipe to nonblocking mode");
220 root 1.1 }
221    
222     void
223     min_parallel(nthreads)
224     int nthreads
225 root 1.5 PROTOTYPE: $
226 root 1.1 CODE:
227     while (nthreads > started)
228     start_thread ();
229    
230     void
231     max_parallel(nthreads)
232     int nthreads
233 root 1.5 PROTOTYPE: $
234 root 1.1 CODE:
235 root 1.5 int cur = started;
236     while (cur > nthreads)
237     {
238     end_thread ();
239     cur--;
240     }
241    
242     poll_cb ();
243 root 1.1 while (started > nthreads)
244 root 1.5 {
245     sched_yield ();
246     poll_cb ();
247     }
248 root 1.1
249     void
250 root 1.5 aio_read(fh,offset,length,data,dataoffset,callback)
251     PerlIO * fh
252 root 1.6 UV offset
253     STRLEN length
254 root 1.5 SV * data
255     STRLEN dataoffset
256     SV * callback
257     PROTOTYPE: $$$$$$
258 root 1.3 ALIAS:
259 root 1.5 aio_write = 1
260 root 1.1 CODE:
261 root 1.6 SvUPGRADE (data, SVt_PV);
262     SvPOK_on (data);
263 root 1.5 read_write (aTHX_ ix, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
264    
265     int
266     poll_fileno()
267     PROTOTYPE:
268     CODE:
269     RETVAL = respipe[0];
270     OUTPUT:
271     RETVAL
272    
273     int
274 root 1.6 poll_cb(...)
275 root 1.5 PROTOTYPE:
276     CODE:
277     RETVAL = poll_cb (aTHX);
278     OUTPUT:
279     RETVAL
280    
281     int
282     nreqs()
283     PROTOTYPE:
284     CODE:
285     RETVAL = nreqs;
286     OUTPUT:
287     RETVAL
288