ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.2 by root, Sun Jul 10 18:16:49 2005 UTC vs.
Revision 1.3 by root, Sun Jul 10 20:07:11 2005 UTC

34 REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK, 34 REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
35 REQ_FSYNC, REQ_FDATASYNC, 35 REQ_FSYNC, REQ_FDATASYNC,
36}; 36};
37 37
38typedef struct aio_cb { 38typedef struct aio_cb {
39 struct aio_cb *next; 39 struct aio_cb *volatile next;
40 40
41 int type; 41 int type;
42 42
43 int fd; 43 int fd;
44 off_t offset; 44 off_t offset;
55 55
56typedef aio_cb *aio_req; 56typedef aio_cb *aio_req;
57 57
58static int started; 58static int started;
59static int nreqs; 59static int nreqs;
60static int reqpipe[2], respipe[2]; 60static int respipe [2];
61 61
62static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
63static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
64static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
65
62static aio_req qs, qe; /* queue start, queue end */ 66static volatile aio_req reqs, reqe; /* queue start, queue end */
67static volatile aio_req ress, rese; /* queue start, queue end */
63 68
64static void *aio_proc(void *arg); 69static void *aio_proc(void *arg);
65 70
66static void 71static void
67start_thread (void) 72start_thread (void)
82 87
83 sigprocmask (SIG_SETMASK, &oldsigset, 0); 88 sigprocmask (SIG_SETMASK, &oldsigset, 0);
84} 89}
85 90
86static void 91static void
87send_reqs (void)
88{
89 /* this write is atomic */
90 while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs)
91 {
92 qs = qs->next;
93 if (!qs) qe = 0;
94 }
95}
96
97static void
98send_req (aio_req req) 92send_req (aio_req req)
99{ 93{
100 nreqs++; 94 nreqs++;
95
96 pthread_mutex_lock (&reqlock);
97
101 req->next = 0; 98 req->next = 0;
102 99
103 if (qe) 100 if (reqe)
104 { 101 {
105 qe->next = req; 102 reqe->next = req;
106 qe = req; 103 reqe = req;
107 } 104 }
108 else 105 else
109 qe = qs = req; 106 reqe = reqs = req;
110 107
111 send_reqs (); 108 pthread_cond_signal (&reqwait);
109 pthread_mutex_unlock (&reqlock);
112} 110}
113 111
114static void 112static void
115end_thread (void) 113end_thread (void)
116{ 114{
171} 169}
172 170
173static void 171static void
174poll_wait () 172poll_wait ()
175{ 173{
174 if (!nreqs)
175 return;
176
176 fd_set rfd; 177 fd_set rfd;
177 FD_ZERO(&rfd); 178 FD_ZERO(&rfd);
178 FD_SET(respipe[0], &rfd); 179 FD_SET(respipe [0], &rfd);
179 180
180 select (respipe[0] + 1, &rfd, 0, 0, 0); 181 select (respipe [0] + 1, &rfd, 0, 0, 0);
181} 182}
182 183
183static int 184static int
184poll_cb (pTHX) 185poll_cb (pTHX)
185{ 186{
186 dSP; 187 dSP;
187 int count = 0; 188 int count = 0;
188 aio_req req; 189 aio_req req;
190
191 {
192 /* read and signals sent by the worker threads */
193 char buf [32];
194 while (read (respipe [0], buf, 32) > 0)
195 ;
196 }
189 197
190 while (read (respipe[0], (void *)&req, sizeof (req)) == sizeof (req)) 198 for (;;)
191 { 199 {
200 pthread_mutex_lock (&reslock);
201
202 req = ress;
203
204 if (ress)
205 {
206 ress = ress->next;
207 if (!ress) rese = 0;
208 }
209
210 pthread_mutex_unlock (&reslock);
211
212 if (!req)
213 break;
214
192 nreqs--; 215 nreqs--;
193 216
194 if (req->type == REQ_QUIT) 217 if (req->type == REQ_QUIT)
195 started--; 218 started--;
196 else 219 else
244 } 267 }
245 268
246 Safefree (req); 269 Safefree (req);
247 } 270 }
248 271
249 if (qs)
250 send_reqs ();
251
252 return count; 272 return count;
253} 273}
254 274
255static void * 275static void *
256aio_proc (void *thr_arg) 276aio_proc (void *thr_arg)
257{ 277{
258 aio_req req; 278 aio_req req;
279 int type;
259 280
260 /* then loop */ 281 do
261 while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req))
262 { 282 {
283 pthread_mutex_lock (&reqlock);
284
285 for (;;)
286 {
287 req = reqs;
288
289 if (reqs)
290 {
291 reqs = reqs->next;
292 if (!reqs) reqe = 0;
293 }
294
295 if (req)
296 break;
297
298 pthread_cond_wait (&reqwait, &reqlock);
299 }
300
301 pthread_mutex_unlock (&reqlock);
302
263 errno = 0; /* strictly unnecessary */ 303 errno = 0; /* strictly unnecessary */
264 304
305 type = req->type;
306
265 switch (req->type) 307 switch (type)
266 { 308 {
267 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break; 309 case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset); break;
268 case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break; 310 case REQ_WRITE: req->result = pwrite64 (req->fd, req->dataptr, req->length, req->offset); break;
269#if SYS_readahead 311#if SYS_readahead
270 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 312 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
282 324
283 case REQ_FSYNC: req->result = fsync (req->fd); break; 325 case REQ_FSYNC: req->result = fsync (req->fd); break;
284 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; 326 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
285 327
286 case REQ_QUIT: 328 case REQ_QUIT:
287 write (respipe[1], (void *)&req, sizeof (req)); 329 break;
288 return 0;
289 330
290 default: 331 default:
291 req->result = ENOSYS; 332 req->result = ENOSYS;
292 break; 333 break;
293 } 334 }
294 335
295 req->errorno = errno; 336 req->errorno = errno;
296 write (respipe[1], (void *)&req, sizeof (req)); 337
338 pthread_mutex_lock (&reslock);
339
340 req->next = 0;
341
342 if (rese)
343 {
344 rese->next = req;
345 rese = req;
346 }
347 else
348 {
349 rese = ress = req;
350
351 /* write a dummy byte to the pipe so fh becomes ready */
352 write (respipe [1], &respipe, 1);
353 }
354
355 pthread_mutex_unlock (&reslock);
297 } 356 }
357 while (type != REQ_QUIT);
298 358
299 return 0; 359 return 0;
300} 360}
301 361
302MODULE = IO::AIO PACKAGE = IO::AIO 362MODULE = IO::AIO PACKAGE = IO::AIO
303 363
304BOOT: 364BOOT:
305{ 365{
306 if (pipe (reqpipe) || pipe (respipe)) 366 if (pipe (respipe))
307 croak ("unable to initialize request or result pipe"); 367 croak ("unable to initialize result pipe");
308 368
309 if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK)) 369 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
310 croak ("cannot set result pipe to nonblocking mode"); 370 croak ("cannot set result pipe to nonblocking mode");
311 371
312 if (fcntl (respipe[0], F_SETFL, O_NONBLOCK)) 372 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
313 croak ("cannot set result pipe to nonblocking mode"); 373 croak ("cannot set result pipe to nonblocking mode");
314} 374}
315 375
316void 376void
317min_parallel(nthreads) 377min_parallel(nthreads)
506 566
507int 567int
508poll_fileno() 568poll_fileno()
509 PROTOTYPE: 569 PROTOTYPE:
510 CODE: 570 CODE:
511 RETVAL = respipe[0]; 571 RETVAL = respipe [0];
512 OUTPUT: 572 OUTPUT:
513 RETVAL 573 RETVAL
514 574
515int 575int
516poll_cb(...) 576poll_cb(...)
522 582
523void 583void
524poll_wait() 584poll_wait()
525 PROTOTYPE: 585 PROTOTYPE:
526 CODE: 586 CODE:
587 if (nreqs)
527 poll_wait (); 588 poll_wait ();
528 589
529int 590int
530nreqs() 591nreqs()
531 PROTOTYPE: 592 PROTOTYPE:
532 CODE: 593 CODE:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines