ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.9 by root, Tue Jul 12 11:02:54 2005 UTC vs.
Revision 1.15 by root, Sat Jul 23 18:19:56 2005 UTC

2#include "perl.h" 2#include "perl.h"
3#include "XSUB.h" 3#include "XSUB.h"
4 4
5#include <sys/types.h> 5#include <sys/types.h>
6#include <sys/stat.h> 6#include <sys/stat.h>
7
7#include <unistd.h> 8#include <unistd.h>
8#include <fcntl.h> 9#include <fcntl.h>
9#include <signal.h> 10#include <signal.h>
10#include <sched.h> 11#include <sched.h>
12#if __linux
13#include <sys/syscall.h>
14#endif
11 15
12#include <pthread.h> 16#include <pthread.h>
13#include <sys/syscall.h>
14 17
15typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */ 18typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
16typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */ 19typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
17typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */ 20typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
18 21
39 off_t offset; 42 off_t offset;
40 size_t length; 43 size_t length;
41 ssize_t result; 44 ssize_t result;
42 mode_t mode; /* open */ 45 mode_t mode; /* open */
43 int errorno; 46 int errorno;
44 SV *data, *callback; 47 SV *data, *callback, *fh;
45 void *dataptr; 48 void *dataptr;
46 STRLEN dataoffset; 49 STRLEN dataoffset;
47 50
48 Stat_t *statdata; 51 Stat_t *statdata;
49} aio_cb; 52} aio_cb;
50 53
51typedef aio_cb *aio_req; 54typedef aio_cb *aio_req;
52 55
53static int started; 56static int started;
54static int nreqs; 57static volatile int nreqs;
55static int max_outstanding = 1<<30; 58static int max_outstanding = 1<<30;
56static int respipe [2]; 59static int respipe [2];
57 60
58static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER; 61static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
59static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER; 62static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
63static volatile aio_req ress, rese; /* queue start, queue end */ 66static volatile aio_req ress, rese; /* queue start, queue end */
64 67
65static void 68static void
66poll_wait () 69poll_wait ()
67{ 70{
68 if (!nreqs) 71 if (nreqs && !ress)
69 return; 72 {
70
71 fd_set rfd; 73 fd_set rfd;
72 FD_ZERO(&rfd); 74 FD_ZERO(&rfd);
73 FD_SET(respipe [0], &rfd); 75 FD_SET(respipe [0], &rfd);
74 76
75 select (respipe [0] + 1, &rfd, 0, 0, 0); 77 select (respipe [0] + 1, &rfd, 0, 0, 0);
78 }
76} 79}
77 80
78static int 81static int
79poll_cb () 82poll_cb ()
80{ 83{
81 dSP; 84 dSP;
82 int count = 0; 85 int count = 0;
83 aio_req req; 86 aio_req req, prv;
84 87
88 pthread_mutex_lock (&reslock);
89
85 { 90 {
86 /* read and signals sent by the worker threads */ 91 /* read any signals sent by the worker threads */
87 char buf [32]; 92 char buf [32];
88 while (read (respipe [0], buf, 32) > 0) 93 while (read (respipe [0], buf, 32) > 0)
89 ; 94 ;
90 } 95 }
91 96
92 for (;;) 97 req = ress;
98 ress = rese = 0;
99
100 pthread_mutex_unlock (&reslock);
101
102 while (req)
93 { 103 {
94 pthread_mutex_lock (&reslock);
95
96 req = ress;
97
98 if (ress)
99 {
100 ress = ress->next;
101 if (!ress) rese = 0;
102 }
103
104 pthread_mutex_unlock (&reslock);
105
106 if (!req)
107 break;
108
109 nreqs--; 104 nreqs--;
110 105
111 if (req->type == REQ_QUIT) 106 if (req->type == REQ_QUIT)
112 started--; 107 started--;
113 else 108 else
120 + req->result > 0 ? req->result : 0); 115 + req->result > 0 ? req->result : 0);
121 116
122 if (req->data) 117 if (req->data)
123 SvREFCNT_dec (req->data); 118 SvREFCNT_dec (req->data);
124 119
120 if (req->fh)
121 SvREFCNT_dec (req->fh);
122
125 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT) 123 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
126 { 124 {
127 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; 125 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
128 PL_laststatval = req->result; 126 PL_laststatval = req->result;
129 PL_statcache = *(req->statdata); 127 PL_statcache = *(req->statdata);
130 128
131 Safefree (req->statdata); 129 Safefree (req->statdata);
132 } 130 }
133 131
132 ENTER;
134 PUSHMARK (SP); 133 PUSHMARK (SP);
135 XPUSHs (sv_2mortal (newSViv (req->result))); 134 XPUSHs (sv_2mortal (newSViv (req->result)));
136 135
137 if (req->type == REQ_OPEN) 136 if (req->type == REQ_OPEN)
138 { 137 {
141 140
142 PUTBACK; 141 PUTBACK;
143 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); 142 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
144 SPAGAIN; 143 SPAGAIN;
145 144
146 fh = POPs; 145 fh = SvREFCNT_inc (POPs);
147 146
148 PUSHMARK (SP); 147 PUSHMARK (SP);
149 XPUSHs (fh); 148 XPUSHs (sv_2mortal (fh));
150 } 149 }
151 150
152 if (SvOK (req->callback)) 151 if (SvOK (req->callback))
153 { 152 {
154 PUTBACK; 153 PUTBACK;
155 call_sv (req->callback, G_VOID | G_EVAL); 154 call_sv (req->callback, G_VOID | G_EVAL);
156 SPAGAIN; 155 SPAGAIN;
157 } 156 }
157
158 LEAVE;
158 159
159 if (req->callback) 160 if (req->callback)
160 SvREFCNT_dec (req->callback); 161 SvREFCNT_dec (req->callback);
161 162
162 errno = errorno; 163 errno = errorno;
163 count++; 164 count++;
164 } 165 }
165 166
167 prv = req;
168 req = req->next;
166 Safefree (req); 169 Safefree (prv);
170
171 /* TODO: croak on errors? */
167 } 172 }
168 173
169 return count; 174 return count;
170} 175}
171 176
226 req->type = REQ_QUIT; 231 req->type = REQ_QUIT;
227 232
228 send_req (req); 233 send_req (req);
229} 234}
230 235
231static void
232read_write (int dowrite, int fd, off_t offset, size_t length,
233 SV *data, STRLEN dataoffset, SV *callback)
234{
235 aio_req req;
236 STRLEN svlen;
237 char *svptr = SvPV (data, svlen);
238
239 SvUPGRADE (data, SVt_PV);
240 SvPOK_on (data);
241
242 if (dataoffset < 0)
243 dataoffset += svlen;
244
245 if (dataoffset < 0 || dataoffset > svlen)
246 croak ("data offset outside of string");
247
248 if (dowrite)
249 {
250 /* write: check length and adjust. */
251 if (length < 0 || length + dataoffset > svlen)
252 length = svlen - dataoffset;
253 }
254 else
255 {
256 /* read: grow scalar as necessary */
257 svptr = SvGROW (data, length + dataoffset);
258 }
259
260 if (length < 0)
261 croak ("length must not be negative");
262
263 Newz (0, req, 1, aio_cb);
264
265 if (!req)
266 croak ("out of memory during aio_req allocation");
267
268 req->type = dowrite ? REQ_WRITE : REQ_READ;
269 req->fd = fd;
270 req->offset = offset;
271 req->length = length;
272 req->data = SvREFCNT_inc (data);
273 req->dataptr = (char *)svptr + dataoffset;
274 req->callback = SvREFCNT_inc (callback);
275
276 send_req (req);
277}
278
279static void * 236static void *
280aio_proc (void *thr_arg) 237aio_proc (void *thr_arg)
281{ 238{
282 aio_req req; 239 aio_req req;
283 int type; 240 int type;
308 265
309 type = req->type; 266 type = req->type;
310 267
311 switch (type) 268 switch (type)
312 { 269 {
313 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break; 270 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
314 case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break; 271 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
315#if SYS_readahead 272#if SYS_readahead
316 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 273 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
317#else 274#else
318 case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break; 275 case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
319#endif 276#endif
441 send_req (req); 398 send_req (req);
442} 399}
443 400
444void 401void
445aio_close(fh,callback=&PL_sv_undef) 402aio_close(fh,callback=&PL_sv_undef)
446 InputStream fh 403 SV * fh
447 SV * callback 404 SV * callback
448 PROTOTYPE: $;$ 405 PROTOTYPE: $;$
449 ALIAS: 406 ALIAS:
450 aio_close = REQ_CLOSE 407 aio_close = REQ_CLOSE
451 aio_fsync = REQ_FSYNC 408 aio_fsync = REQ_FSYNC
452 aio_fdatasync = REQ_FDATASYNC 409 aio_fdatasync = REQ_FDATASYNC
458 415
459 if (!req) 416 if (!req)
460 croak ("out of memory during aio_req allocation"); 417 croak ("out of memory during aio_req allocation");
461 418
462 req->type = ix; 419 req->type = ix;
420 req->fh = newSVsv (fh);
463 req->fd = PerlIO_fileno (fh); 421 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
464 req->callback = SvREFCNT_inc (callback); 422 req->callback = SvREFCNT_inc (callback);
465 423
466 send_req (req); 424 send_req (req);
467} 425}
468 426
469void 427void
470aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef) 428aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
471 InputStream fh 429 SV * fh
472 UV offset 430 UV offset
473 IV length 431 IV length
474 SV * data 432 SV * data
475 IV dataoffset 433 IV dataoffset
476 SV * callback 434 SV * callback
435 ALIAS:
436 aio_read = REQ_READ
437 aio_write = REQ_WRITE
477 PROTOTYPE: $$$$$;$ 438 PROTOTYPE: $$$$$;$
478 CODE: 439 CODE:
479 read_write (0, PerlIO_fileno (fh), offset, length, data, dataoffset, callback); 440{
441 aio_req req;
442 STRLEN svlen;
443 char *svptr = SvPV (data, svlen);
480 444
481void 445 SvUPGRADE (data, SVt_PV);
482aio_write(fh,offset,length,data,dataoffset,callback=&PL_sv_undef) 446 SvPOK_on (data);
483 OutputStream fh 447
484 UV offset 448 if (dataoffset < 0)
485 IV length 449 dataoffset += svlen;
486 SV * data 450
487 IV dataoffset 451 if (dataoffset < 0 || dataoffset > svlen)
488 SV * callback 452 croak ("data offset outside of string");
489 PROTOTYPE: $$$$$;$ 453
490 CODE: 454 if (ix == REQ_WRITE)
491 read_write (1, PerlIO_fileno (fh), offset, length, data, dataoffset, callback); 455 {
456 /* write: check length and adjust. */
457 if (length < 0 || length + dataoffset > svlen)
458 length = svlen - dataoffset;
459 }
460 else
461 {
462 /* read: grow scalar as necessary */
463 svptr = SvGROW (data, length + dataoffset);
464 }
465
466 if (length < 0)
467 croak ("length must not be negative");
468
469 Newz (0, req, 1, aio_cb);
470
471 if (!req)
472 croak ("out of memory during aio_req allocation");
473
474 req->type = ix;
475 req->fh = newSVsv (fh);
476 req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
477 : IoOFP (sv_2io (fh)));
478 req->offset = offset;
479 req->length = length;
480 req->data = SvREFCNT_inc (data);
481 req->dataptr = (char *)svptr + dataoffset;
482 req->callback = SvREFCNT_inc (callback);
483
484 send_req (req);
485}
492 486
493void 487void
494aio_readahead(fh,offset,length,callback=&PL_sv_undef) 488aio_readahead(fh,offset,length,callback=&PL_sv_undef)
495 InputStream fh 489 SV * fh
496 UV offset 490 UV offset
497 IV length 491 IV length
498 SV * callback 492 SV * callback
499 PROTOTYPE: $$$;$ 493 PROTOTYPE: $$$;$
500 CODE: 494 CODE:
501{ 495{
502 aio_req req; 496 aio_req req;
503 497
508 502
509 if (!req) 503 if (!req)
510 croak ("out of memory during aio_req allocation"); 504 croak ("out of memory during aio_req allocation");
511 505
512 req->type = REQ_READAHEAD; 506 req->type = REQ_READAHEAD;
507 req->fh = newSVsv (fh);
513 req->fd = PerlIO_fileno (fh); 508 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
514 req->offset = offset; 509 req->offset = offset;
515 req->length = length; 510 req->length = length;
516 req->callback = SvREFCNT_inc (callback); 511 req->callback = SvREFCNT_inc (callback);
517 512
518 send_req (req); 513 send_req (req);
546 req->dataptr = SvPV_nolen (req->data); 541 req->dataptr = SvPV_nolen (req->data);
547 } 542 }
548 else 543 else
549 { 544 {
550 req->type = REQ_FSTAT; 545 req->type = REQ_FSTAT;
546 req->fh = newSVsv (fh_or_path);
551 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path))); 547 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
552 } 548 }
553 549
554 req->callback = SvREFCNT_inc (callback); 550 req->callback = SvREFCNT_inc (callback);
555 551

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines