ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.15
Committed: Sat Jul 23 18:19:56 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.14: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <sys/types.h>
6 #include <sys/stat.h>
7
8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <signal.h>
11 #include <sched.h>
12 #if __linux
13 #include <sys/syscall.h>
14 #endif
15
16 #include <pthread.h>
17
18 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
20 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
21
22 #if __ia64
23 # define STACKSIZE 65536
24 #else
25 # define STACKSIZE 4096
26 #endif
27
28 enum {
29 REQ_QUIT,
30 REQ_OPEN, REQ_CLOSE,
31 REQ_READ, REQ_WRITE, REQ_READAHEAD,
32 REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
33 REQ_FSYNC, REQ_FDATASYNC,
34 };
35
36 typedef struct aio_cb {
37 struct aio_cb *volatile next;
38
39 int type;
40
41 int fd;
42 off_t offset;
43 size_t length;
44 ssize_t result;
45 mode_t mode; /* open */
46 int errorno;
47 SV *data, *callback, *fh;
48 void *dataptr;
49 STRLEN dataoffset;
50
51 Stat_t *statdata;
52 } aio_cb;
53
54 typedef aio_cb *aio_req;
55
56 static int started;
57 static volatile int nreqs;
58 static int max_outstanding = 1<<30;
59 static int respipe [2];
60
61 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
63 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
64
65 static volatile aio_req reqs, reqe; /* queue start, queue end */
66 static volatile aio_req ress, rese; /* queue start, queue end */
67
68 static void
69 poll_wait ()
70 {
71 if (nreqs && !ress)
72 {
73 fd_set rfd;
74 FD_ZERO(&rfd);
75 FD_SET(respipe [0], &rfd);
76
77 select (respipe [0] + 1, &rfd, 0, 0, 0);
78 }
79 }
80
81 static int
82 poll_cb ()
83 {
84 dSP;
85 int count = 0;
86 aio_req req, prv;
87
88 pthread_mutex_lock (&reslock);
89
90 {
91 /* read any signals sent by the worker threads */
92 char buf [32];
93 while (read (respipe [0], buf, 32) > 0)
94 ;
95 }
96
97 req = ress;
98 ress = rese = 0;
99
100 pthread_mutex_unlock (&reslock);
101
102 while (req)
103 {
104 nreqs--;
105
106 if (req->type == REQ_QUIT)
107 started--;
108 else
109 {
110 int errorno = errno;
111 errno = req->errorno;
112
113 if (req->type == REQ_READ)
114 SvCUR_set (req->data, req->dataoffset
115 + req->result > 0 ? req->result : 0);
116
117 if (req->data)
118 SvREFCNT_dec (req->data);
119
120 if (req->fh)
121 SvREFCNT_dec (req->fh);
122
123 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
124 {
125 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
126 PL_laststatval = req->result;
127 PL_statcache = *(req->statdata);
128
129 Safefree (req->statdata);
130 }
131
132 ENTER;
133 PUSHMARK (SP);
134 XPUSHs (sv_2mortal (newSViv (req->result)));
135
136 if (req->type == REQ_OPEN)
137 {
138 /* convert fd to fh */
139 SV *fh;
140
141 PUTBACK;
142 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
143 SPAGAIN;
144
145 fh = SvREFCNT_inc (POPs);
146
147 PUSHMARK (SP);
148 XPUSHs (sv_2mortal (fh));
149 }
150
151 if (SvOK (req->callback))
152 {
153 PUTBACK;
154 call_sv (req->callback, G_VOID | G_EVAL);
155 SPAGAIN;
156 }
157
158 LEAVE;
159
160 if (req->callback)
161 SvREFCNT_dec (req->callback);
162
163 errno = errorno;
164 count++;
165 }
166
167 prv = req;
168 req = req->next;
169 Safefree (prv);
170
171 /* TODO: croak on errors? */
172 }
173
174 return count;
175 }
176
177 static void *aio_proc(void *arg);
178
179 static void
180 start_thread (void)
181 {
182 sigset_t fullsigset, oldsigset;
183 pthread_t tid;
184 pthread_attr_t attr;
185
186 pthread_attr_init (&attr);
187 pthread_attr_setstacksize (&attr, STACKSIZE);
188 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
189
190 sigfillset (&fullsigset);
191 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
192
193 if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
194 started++;
195
196 sigprocmask (SIG_SETMASK, &oldsigset, 0);
197 }
198
199 static void
200 send_req (aio_req req)
201 {
202 nreqs++;
203
204 pthread_mutex_lock (&reqlock);
205
206 req->next = 0;
207
208 if (reqe)
209 {
210 reqe->next = req;
211 reqe = req;
212 }
213 else
214 reqe = reqs = req;
215
216 pthread_cond_signal (&reqwait);
217 pthread_mutex_unlock (&reqlock);
218
219 while (nreqs > max_outstanding)
220 {
221 poll_wait ();
222 poll_cb ();
223 }
224 }
225
226 static void
227 end_thread (void)
228 {
229 aio_req req;
230 New (0, req, 1, aio_cb);
231 req->type = REQ_QUIT;
232
233 send_req (req);
234 }
235
236 static void *
237 aio_proc (void *thr_arg)
238 {
239 aio_req req;
240 int type;
241
242 do
243 {
244 pthread_mutex_lock (&reqlock);
245
246 for (;;)
247 {
248 req = reqs;
249
250 if (reqs)
251 {
252 reqs = reqs->next;
253 if (!reqs) reqe = 0;
254 }
255
256 if (req)
257 break;
258
259 pthread_cond_wait (&reqwait, &reqlock);
260 }
261
262 pthread_mutex_unlock (&reqlock);
263
264 errno = 0; /* strictly unnecessary */
265
266 type = req->type;
267
268 switch (type)
269 {
270 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
271 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
272 #if SYS_readahead
273 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
274 #else
275 case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
276 #endif
277
278 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
279 case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
280 case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
281
282 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
283 case REQ_CLOSE: req->result = close (req->fd); break;
284 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
285
286 case REQ_FSYNC: req->result = fsync (req->fd); break;
287 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
288
289 case REQ_QUIT:
290 break;
291
292 default:
293 req->result = ENOSYS;
294 break;
295 }
296
297 req->errorno = errno;
298
299 pthread_mutex_lock (&reslock);
300
301 req->next = 0;
302
303 if (rese)
304 {
305 rese->next = req;
306 rese = req;
307 }
308 else
309 {
310 rese = ress = req;
311
312 /* write a dummy byte to the pipe so fh becomes ready */
313 write (respipe [1], &respipe, 1);
314 }
315
316 pthread_mutex_unlock (&reslock);
317 }
318 while (type != REQ_QUIT);
319
320 return 0;
321 }
322
323 MODULE = IO::AIO PACKAGE = IO::AIO
324
325 PROTOTYPES: ENABLE
326
327 BOOT:
328 {
329 if (pipe (respipe))
330 croak ("unable to initialize result pipe");
331
332 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
333 croak ("cannot set result pipe to nonblocking mode");
334
335 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
336 croak ("cannot set result pipe to nonblocking mode");
337 }
338
339 void
340 min_parallel(nthreads)
341 int nthreads
342 PROTOTYPE: $
343 CODE:
344 while (nthreads > started)
345 start_thread ();
346
347 void
348 max_parallel(nthreads)
349 int nthreads
350 PROTOTYPE: $
351 CODE:
352 {
353 int cur = started;
354 while (cur > nthreads)
355 {
356 end_thread ();
357 cur--;
358 }
359
360 while (started > nthreads)
361 {
362 poll_wait ();
363 poll_cb ();
364 }
365 }
366
367 int
368 max_outstanding(nreqs)
369 int nreqs
370 PROTOTYPE: $
371 CODE:
372 RETVAL = max_outstanding;
373 max_outstanding = nreqs;
374
375 void
376 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
377 SV * pathname
378 int flags
379 int mode
380 SV * callback
381 PROTOTYPE: $$$;$
382 CODE:
383 {
384 aio_req req;
385
386 Newz (0, req, 1, aio_cb);
387
388 if (!req)
389 croak ("out of memory during aio_req allocation");
390
391 req->type = REQ_OPEN;
392 req->data = newSVsv (pathname);
393 req->dataptr = SvPV_nolen (req->data);
394 req->fd = flags;
395 req->mode = mode;
396 req->callback = SvREFCNT_inc (callback);
397
398 send_req (req);
399 }
400
401 void
402 aio_close(fh,callback=&PL_sv_undef)
403 SV * fh
404 SV * callback
405 PROTOTYPE: $;$
406 ALIAS:
407 aio_close = REQ_CLOSE
408 aio_fsync = REQ_FSYNC
409 aio_fdatasync = REQ_FDATASYNC
410 CODE:
411 {
412 aio_req req;
413
414 Newz (0, req, 1, aio_cb);
415
416 if (!req)
417 croak ("out of memory during aio_req allocation");
418
419 req->type = ix;
420 req->fh = newSVsv (fh);
421 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
422 req->callback = SvREFCNT_inc (callback);
423
424 send_req (req);
425 }
426
427 void
428 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
429 SV * fh
430 UV offset
431 IV length
432 SV * data
433 IV dataoffset
434 SV * callback
435 ALIAS:
436 aio_read = REQ_READ
437 aio_write = REQ_WRITE
438 PROTOTYPE: $$$$$;$
439 CODE:
440 {
441 aio_req req;
442 STRLEN svlen;
443 char *svptr = SvPV (data, svlen);
444
445 SvUPGRADE (data, SVt_PV);
446 SvPOK_on (data);
447
448 if (dataoffset < 0)
449 dataoffset += svlen;
450
451 if (dataoffset < 0 || dataoffset > svlen)
452 croak ("data offset outside of string");
453
454 if (ix == REQ_WRITE)
455 {
456 /* write: check length and adjust. */
457 if (length < 0 || length + dataoffset > svlen)
458 length = svlen - dataoffset;
459 }
460 else
461 {
462 /* read: grow scalar as necessary */
463 svptr = SvGROW (data, length + dataoffset);
464 }
465
466 if (length < 0)
467 croak ("length must not be negative");
468
469 Newz (0, req, 1, aio_cb);
470
471 if (!req)
472 croak ("out of memory during aio_req allocation");
473
474 req->type = ix;
475 req->fh = newSVsv (fh);
476 req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
477 : IoOFP (sv_2io (fh)));
478 req->offset = offset;
479 req->length = length;
480 req->data = SvREFCNT_inc (data);
481 req->dataptr = (char *)svptr + dataoffset;
482 req->callback = SvREFCNT_inc (callback);
483
484 send_req (req);
485 }
486
487 void
488 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
489 SV * fh
490 UV offset
491 IV length
492 SV * callback
493 PROTOTYPE: $$$;$
494 CODE:
495 {
496 aio_req req;
497
498 if (length < 0)
499 croak ("length must not be negative");
500
501 Newz (0, req, 1, aio_cb);
502
503 if (!req)
504 croak ("out of memory during aio_req allocation");
505
506 req->type = REQ_READAHEAD;
507 req->fh = newSVsv (fh);
508 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
509 req->offset = offset;
510 req->length = length;
511 req->callback = SvREFCNT_inc (callback);
512
513 send_req (req);
514 }
515
516 void
517 aio_stat(fh_or_path,callback=&PL_sv_undef)
518 SV * fh_or_path
519 SV * callback
520 ALIAS:
521 aio_stat = REQ_STAT
522 aio_lstat = REQ_LSTAT
523 CODE:
524 {
525 aio_req req;
526
527 Newz (0, req, 1, aio_cb);
528
529 if (!req)
530 croak ("out of memory during aio_req allocation");
531
532 New (0, req->statdata, 1, Stat_t);
533
534 if (!req->statdata)
535 croak ("out of memory during aio_req->statdata allocation");
536
537 if (SvPOK (fh_or_path))
538 {
539 req->type = ix;
540 req->data = newSVsv (fh_or_path);
541 req->dataptr = SvPV_nolen (req->data);
542 }
543 else
544 {
545 req->type = REQ_FSTAT;
546 req->fh = newSVsv (fh_or_path);
547 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
548 }
549
550 req->callback = SvREFCNT_inc (callback);
551
552 send_req (req);
553 }
554
555 void
556 aio_unlink(pathname,callback=&PL_sv_undef)
557 SV * pathname
558 SV * callback
559 CODE:
560 {
561 aio_req req;
562
563 Newz (0, req, 1, aio_cb);
564
565 if (!req)
566 croak ("out of memory during aio_req allocation");
567
568 req->type = REQ_UNLINK;
569 req->data = newSVsv (pathname);
570 req->dataptr = SvPV_nolen (req->data);
571 req->callback = SvREFCNT_inc (callback);
572
573 send_req (req);
574 }
575
576 void
577 flush()
578 PROTOTYPE:
579 CODE:
580 while (nreqs)
581 {
582 poll_wait ();
583 poll_cb ();
584 }
585
586 void
587 poll()
588 PROTOTYPE:
589 CODE:
590 if (nreqs)
591 {
592 poll_wait ();
593 poll_cb ();
594 }
595
596 int
597 poll_fileno()
598 PROTOTYPE:
599 CODE:
600 RETVAL = respipe [0];
601 OUTPUT:
602 RETVAL
603
604 int
605 poll_cb(...)
606 PROTOTYPE:
607 CODE:
608 RETVAL = poll_cb ();
609 OUTPUT:
610 RETVAL
611
612 void
613 poll_wait()
614 PROTOTYPE:
615 CODE:
616 if (nreqs)
617 poll_wait ();
618
619 int
620 nreqs()
621 PROTOTYPE:
622 CODE:
623 RETVAL = nreqs;
624 OUTPUT:
625 RETVAL
626