ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Linux-AIO/AIO.xs
(Generate patch)

Comparing Linux-AIO/AIO.xs (file contents):
Revision 1.1 by root, Tue Aug 14 03:18:53 2001 UTC vs.
Revision 1.25 by root, Thu Jul 7 22:24:09 2005 UTC

1#define PERL_NO_GET_CONTEXT
2
1#include "EXTERN.h" 3#include "EXTERN.h"
2#include "perl.h" 4#include "perl.h"
3#include "XSUB.h" 5#include "XSUB.h"
4 6
5#include <sys/types.h> 7#include <sys/types.h>
8#include <sys/stat.h>
6#include <unistd.h> 9#include <unistd.h>
10#include <fcntl.h>
11#include <signal.h>
7#include <sched.h> 12#include <sched.h>
8 13
9#define STACKSIZE 128 /* yeah */ 14typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
15typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
10 17
11#define REQ_EXIT 0 18// 128 seems to be enough most everywhere. alpha needs 256.
12#define REQ_READ 1 19#define STACKSIZE (256 * sizeof (long))
13#define REQ_WRITE 2 20
21enum {
22 REQ_QUIT,
23 REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE,
24 REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK
25};
14 26
15typedef struct { 27typedef struct {
16 char stack[STACKSIZE]; 28 char stack[STACKSIZE];
17} aio_thread; 29} aio_thread;
18 30
19typedef struct { 31typedef struct aio_cb {
32 struct aio_cb *next;
33
20 int type; 34 int type;
21 aio_thread *thread; 35 aio_thread *thread;
22 36
23/* read/write */
24 int fd; 37 int fd;
25 off_t offset; 38 off_t offset;
26 size_t length; 39 size_t length;
27 ssize_t done; 40 ssize_t result;
41 mode_t mode; /* open */
28 int errorno; 42 int errorno;
29 43 SV *data, *callback;
30 SV *data;
31 void *dataptr; 44 void *dataptr;
32 STRLEN offset; 45 STRLEN dataoffset;
46
47 Stat_t *statdata;
33} aio_cb; 48} aio_cb;
34 49
35typedef aio_cb *aio_req; 50typedef aio_cb *aio_req;
36 51
37static int started; 52static int started;
53static int nreqs;
38static int reqpipe[2], respipe[2]; 54static int reqpipe[2], respipe[2];
39 55
56static aio_req qs, qe; /* queue start, queue end */
57
58static int aio_proc(void *arg);
59
40static void 60static void
41start_thread(void) 61start_thread (void)
42{ 62{
43 aio_thread *thr = NEW ( 63 aio_thread *thr;
64
65 New (0, thr, 1, aio_thread);
66
44 __clone (aio_proc, 67 if (clone (aio_proc,
68 &(thr->stack[STACKSIZE - sizeof (long)]),
69 CLONE_VM|CLONE_FS|CLONE_FILES,
70 thr) >= 0)
71 started++;
72 else
73 Safefree (thr);
45} 74}
46 75
47static void 76static void
77send_reqs (void)
78{
79 /* this write is atomic */
80 while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
81 {
82 qs = qs->next;
83 if (!qs) qe = 0;
84 }
85}
86
87static void
88send_req (aio_req req)
89{
90 nreqs++;
91 req->next = 0;
92
93 if (qe)
94 {
95 qe->next = req;
96 qe = req;
97 }
98 else
99 qe = qs = req;
100
101 send_reqs ();
102}
103
104static void
48end_thread(void) 105end_thread (void)
49{ 106{
50 aio_req req = 0; 107 aio_req req;
108 New (0, req, 1, aio_cb);
109 req->type = REQ_QUIT;
110
111 send_req (req);
112}
113
114static void
115read_write (pTHX_
116 int dowrite, int fd, off_t offset, size_t length,
117 SV *data, STRLEN dataoffset, SV *callback)
118{
119 aio_req req;
120 STRLEN svlen;
121 char *svptr = SvPV (data, svlen);
122
123 SvUPGRADE (data, SVt_PV);
124 SvPOK_on (data);
125
126 if (dataoffset < 0)
127 dataoffset += svlen;
128
129 if (dataoffset < 0 || dataoffset > svlen)
130 croak ("data offset outside of string");
131
132 if (dowrite)
133 {
134 /* write: check length and adjust. */
135 if (length < 0 || length + dataoffset > svlen)
136 length = svlen - dataoffset;
137 }
138 else
139 {
140 /* read: grow scalar as necessary */
141 svptr = SvGROW (data, length + dataoffset);
142 }
143
144 if (length < 0)
145 croak ("length must not be negative");
146
147 Newz (0, req, 1, aio_cb);
148
149 if (!req)
150 croak ("out of memory during aio_req allocation");
151
152 req->type = dowrite ? REQ_WRITE : REQ_READ;
153 req->fd = fd;
154 req->offset = offset;
155 req->length = length;
156 req->data = SvREFCNT_inc (data);
157 req->dataptr = (char *)svptr + dataoffset;
158 req->callback = SvREFCNT_inc (callback);
159
160 send_req (req);
161}
162
163static void
164poll_wait ()
165{
166 fd_set rfd;
167 FD_ZERO(&rfd);
168 FD_SET(respipe[0], &rfd);
169
170 select (respipe[0] + 1, &rfd, 0, 0, 0);
171}
172
173static int
174poll_cb (pTHX)
175{
176 dSP;
177 int count = 0;
178 aio_req req;
179
180 while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req))
181 {
182 nreqs--;
183
184 if (req->type == REQ_QUIT)
185 {
186 Safefree (req->thread);
187 started--;
188 }
189 else
190 {
191 int errorno = errno;
192 errno = req->errorno;
193
194 if (req->type == REQ_READ)
195 SvCUR_set (req->data, req->dataoffset
196 + req->result > 0 ? req->result : 0);
197
198 if (req->data)
199 SvREFCNT_dec (req->data);
200
201 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
202 {
203 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
204 PL_laststatval = req->result;
205 PL_statcache = *(req->statdata);
206
207 Safefree (req->statdata);
208 }
209
210 PUSHMARK (SP);
211 XPUSHs (sv_2mortal (newSViv (req->result)));
212 PUTBACK;
213 call_sv (req->callback, G_VOID);
214 SPAGAIN;
215
216 if (req->callback)
217 SvREFCNT_dec (req->callback);
218
219 errno = errorno;
220 count++;
221 }
222
223 Safefree (req);
224 }
225
226 if (qs)
227 send_reqs ();
228
229 return count;
230}
231
232static sigset_t fullsigset;
233
234#undef errno
235#include <asm/unistd.h>
236#include <sys/prctl.h>
237
238#if __alpha || __ia64 || __hppa || __sparc64__ || __v850__
239# define stat kernelstat
240# define stat64 kernelstat64
241# include <asm/stat.h>
242# undef stat
243# undef stat64
244#else
245# define kernelstat stat
246# define kernelstat64 stat64
247#endif
248
249#define COPY_STATDATA \
250 req->statdata->st_dev = statdata.st_dev; \
251 req->statdata->st_ino = statdata.st_ino; \
252 req->statdata->st_mode = statdata.st_mode; \
253 req->statdata->st_nlink = statdata.st_nlink; \
254 req->statdata->st_uid = statdata.st_uid; \
255 req->statdata->st_gid = statdata.st_gid; \
256 req->statdata->st_rdev = statdata.st_rdev; \
257 req->statdata->st_size = statdata.st_size; \
258 req->statdata->st_atime = statdata.st_atime; \
259 req->statdata->st_mtime = statdata.st_mtime; \
260 req->statdata->st_ctime = statdata.st_ctime; \
261 req->statdata->st_blksize = statdata.st_blksize; \
262 req->statdata->st_blocks = statdata.st_blocks; \
263
264static int
265aio_proc (void *thr_arg)
266{
267 aio_thread *thr = thr_arg;
268 aio_req req;
269 int errno;
270
271 /* this is very much kernel-specific :(:(:( */
272 /* we rely on gcc's ability to create closures. */
273 _syscall3(int,read,int,fd,char *,buf,size_t,count)
274 _syscall3(int,write,int,fd,char *,buf,size_t,count)
275
276 _syscall3(int,open,char *,pathname,int,flags,mode_t,mode)
277 _syscall1(int,close,int,fd)
278
279#if __NR_pread64
280 _syscall5(int,pread64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
281 _syscall5(int,pwrite64,int,fd,char *,buf,size_t,count,unsigned int,offset_lo,unsigned int,offset_hi)
282#elif __NR_pread
283 _syscall4(int,pread,int,fd,char *,buf,size_t,count,offset_t,offset)
284 _syscall4(int,pwrite,int,fd,char *,buf,size_t,count,offset_t,offset)
285#else
286# error "neither pread nor pread64 defined"
287#endif
288
289
290#if __NR_stat64
291 _syscall2(int,stat64, const char *, filename, struct kernelstat64 *, buf)
292 _syscall2(int,lstat64, const char *, filename, struct kernelstat64 *, buf)
293 _syscall2(int,fstat64, int, fd, struct kernelstat64 *, buf)
294#elif __NR_stat
295 _syscall2(int,stat, const char *, filename, struct kernelstat *, buf)
296 _syscall2(int,lstat, const char *, filename, struct kernelstat *, buf)
297 _syscall2(int,fstat, int, fd, struct kernelstat *, buf)
298#else
299# error "neither stat64 nor stat defined"
300#endif
301
302 _syscall1(int,unlink, char *, filename);
303
304 sigprocmask (SIG_SETMASK, &fullsigset, 0);
305 prctl (PR_SET_PDEATHSIG, SIGKILL);
306
307 /* then loop */
308 while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
309 {
310 req->thread = thr;
311 errno = 0; /* strictly unnecessary */
312
313 switch (req->type)
314 {
315#if __NR_pread64
316 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
317 case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break;
318#else
319 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
320 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
321#endif
322#if __NR_stat64
323 struct kernelstat64 statdata;
324 case REQ_STAT: req->result = stat64 (req->dataptr, &statdata); COPY_STATDATA; break;
325 case REQ_LSTAT: req->result = lstat64 (req->dataptr, &statdata); COPY_STATDATA; break;
326 case REQ_FSTAT: req->result = fstat64 (req->fd, &statdata); COPY_STATDATA; break;
327#else
328 struct kernelstat statdata;
329 case REQ_STAT: req->result = stat (req->dataptr, &statdata); COPY_STATDATA; break;
330 case REQ_LSTAT: req->result = lstat (req->dataptr, &statdata); COPY_STATDATA; break;
331 case REQ_FSTAT: req->result = fstat (req->fd, &statdata); COPY_STATDATA; break;
332#endif
333 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
334 case REQ_CLOSE: req->result = close (req->fd); break;
335 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
336
337 case REQ_QUIT:
338 default:
339 write (respipe[1], (void *)&req, sizeof (req));
340 return 0;
341 }
342
343 req->errorno = errno;
51 write (reqpipe[1], &req, sizeof (aio_req)); 344 write (respipe[1], (void *)&req, sizeof (req));
52 nthreads--; 345 }
346
347 return 0;
53} 348}
54 349
55MODULE = Linux::AIO PACKAGE = Linux::AIO 350MODULE = Linux::AIO PACKAGE = Linux::AIO
56 351
57BOOT: 352BOOT:
58{ 353{
354 sigfillset (&fullsigset);
355 sigdelset (&fullsigset, SIGTERM);
356 sigdelset (&fullsigset, SIGQUIT);
357 sigdelset (&fullsigset, SIGABRT);
358 sigdelset (&fullsigset, SIGINT);
359
59 if (pipe (reqpipe) || pipe (respipe)) 360 if (pipe (reqpipe) || pipe (respipe))
60 croak ("unable to initialize request or result pipe"); 361 croak ("unable to initialize request or result pipe");
362
363 if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK))
364 croak ("cannot set result pipe to nonblocking mode");
365
366 if (fcntl (respipe[0], F_SETFL, O_NONBLOCK))
367 croak ("cannot set result pipe to nonblocking mode");
61} 368}
62 369
63void 370void
64min_parallel(nthreads) 371min_parallel(nthreads)
65 int nthreads 372 int nthreads
373 PROTOTYPE: $
66 CODE: 374 CODE:
67 while (nthreads > started) 375 while (nthreads > started)
68 start_thread (); 376 start_thread ();
69 377
70void 378void
71max_parallel(nthreads) 379max_parallel(nthreads)
72 int nthreads 380 int nthreads
381 PROTOTYPE: $
73 CODE: 382 CODE:
383 int cur = started;
384 while (cur > nthreads)
385 {
386 end_thread ();
387 cur--;
388 }
389
74 while (started > nthreads) 390 while (started > nthreads)
391 {
392 poll_wait ();
393 poll_cb (aTHX);
394 }
395
396void
397aio_open(pathname,flags,mode,callback)
398 SV * pathname
399 int flags
400 int mode
401 SV * callback
402 PROTOTYPE: $$$$
403 CODE:
404 aio_req req;
405
406 Newz (0, req, 1, aio_cb);
407
408 if (!req)
409 croak ("out of memory during aio_req allocation");
410
411 req->type = REQ_OPEN;
412 req->data = newSVsv (pathname);
413 req->dataptr = SvPV_nolen (req->data);
414 req->fd = flags;
415 req->mode = mode;
416 req->callback = SvREFCNT_inc (callback);
417
75 end_thread (); 418 send_req (req);
76 419
77void 420void
421aio_close(fh,callback)
422 InputStream fh
423 SV * callback
424 PROTOTYPE: $$
425 CODE:
426 aio_req req;
427
428 Newz (0, req, 1, aio_cb);
429
430 if (!req)
431 croak ("out of memory during aio_req allocation");
432
433 req->type = REQ_CLOSE;
434 req->fd = PerlIO_fileno (fh);
435 req->callback = SvREFCNT_inc (callback);
436
437 send_req (req);
438
439void
78read(fh,offset,length,data,dataoffset,callback) 440aio_read(fh,offset,length,data,dataoffset,callback)
441 InputStream fh
442 UV offset
443 IV length
444 SV * data
445 IV dataoffset
446 SV * callback
447 PROTOTYPE: $$$$$$
448 CODE:
449 read_write (aTHX_ 0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
450
451void
452aio_write(fh,offset,length,data,dataoffset,callback)
453 OutputStream fh
454 UV offset
455 IV length
456 SV * data
457 IV dataoffset
458 SV * callback
459 PROTOTYPE: $$$$$$
460 CODE:
461 read_write (aTHX_ 1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback);
462
463void
464aio_stat(fh_or_path,callback)
465 SV * fh_or_path
466 SV * callback
467 PROTOTYPE: $$
468 ALIAS:
469 aio_lstat = 1
79 CODE: 470 CODE:
471 aio_req req;
472
473 Newz (0, req, 1, aio_cb);
474
475 if (!req)
476 croak ("out of memory during aio_req allocation");
477
478 New (0, req->statdata, 1, Stat_t);
479
480 if (!req->statdata)
481 croak ("out of memory during aio_req->statdata allocation");
482
483 if (SvPOK (fh_or_path))
484 {
485 req->type = ix ? REQ_LSTAT : REQ_STAT;
486 req->data = newSVsv (fh_or_path);
487 req->dataptr = SvPV_nolen (req->data);
488 }
489 else
490 {
491 req->type = REQ_FSTAT;
492 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
493 }
494
495 req->callback = SvREFCNT_inc (callback);
496
497 send_req (req);
498
499void
500aio_unlink(pathname,callback)
501 SV * pathname
502 SV * callback
503 PROTOTYPE: $$
504 CODE:
505 aio_req req;
506
507 Newz (0, req, 1, aio_cb);
508
509 if (!req)
510 croak ("out of memory during aio_req allocation");
511
512 req->type = REQ_UNLINK;
513 req->data = newSVsv (pathname);
514 req->dataptr = SvPV_nolen (req->data);
515 req->callback = SvREFCNT_inc (callback);
516
517 send_req (req);
518
519int
520poll_fileno()
521 PROTOTYPE:
522 CODE:
523 RETVAL = respipe[0];
524 OUTPUT:
525 RETVAL
526
527int
528poll_cb(...)
529 PROTOTYPE:
530 CODE:
531 RETVAL = poll_cb (aTHX);
532 OUTPUT:
533 RETVAL
534
535void
536poll_wait()
537 PROTOTYPE:
538 CODE:
539 poll_wait ();
540
541int
542nreqs()
543 PROTOTYPE:
544 CODE:
545 RETVAL = nreqs;
546 OUTPUT:
547 RETVAL
548

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines