ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
Revision: 1.14
Committed: Sat Jul 23 18:15:36 2005 UTC (18 years, 9 months ago) by root
Branch: MAIN
Changes since 1.13: +0 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "perl.h"
2 #include "XSUB.h"
3
4 #include <sys/types.h>
5 #include <sys/stat.h>
6
7 #include <unistd.h>
8 #include <fcntl.h>
9 #include <signal.h>
10 #include <sched.h>
11 #if __linux
12 #include <sys/syscall.h>
13 #endif
14
15 #include <pthread.h>
16
17 typedef void *InputStream; /* hack, but 5.6.1 is simply toooo old ;) */
18 typedef void *OutputStream; /* hack, but 5.6.1 is simply toooo old ;) */
19 typedef void *InOutStream; /* hack, but 5.6.1 is simply toooo old ;) */
20
21 #if __ia64
22 # define STACKSIZE 65536
23 #else
24 # define STACKSIZE 4096
25 #endif
26
27 enum {
28 REQ_QUIT,
29 REQ_OPEN, REQ_CLOSE,
30 REQ_READ, REQ_WRITE, REQ_READAHEAD,
31 REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK,
32 REQ_FSYNC, REQ_FDATASYNC,
33 };
34
35 typedef struct aio_cb {
36 struct aio_cb *volatile next;
37
38 int type;
39
40 int fd;
41 off_t offset;
42 size_t length;
43 ssize_t result;
44 mode_t mode; /* open */
45 int errorno;
46 SV *data, *callback, *fh;
47 void *dataptr;
48 STRLEN dataoffset;
49
50 Stat_t *statdata;
51 } aio_cb;
52
53 typedef aio_cb *aio_req;
54
55 static int started;
56 static volatile int nreqs;
57 static int max_outstanding = 1<<30;
58 static int respipe [2];
59
60 static pthread_mutex_t reslock = PTHREAD_MUTEX_INITIALIZER;
61 static pthread_mutex_t reqlock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
63
64 static volatile aio_req reqs, reqe; /* queue start, queue end */
65 static volatile aio_req ress, rese; /* queue start, queue end */
66
67 static void
68 poll_wait ()
69 {
70 if (nreqs && !ress)
71 {
72 fd_set rfd;
73 FD_ZERO(&rfd);
74 FD_SET(respipe [0], &rfd);
75
76 select (respipe [0] + 1, &rfd, 0, 0, 0);
77 }
78 }
79
80 static int
81 poll_cb ()
82 {
83 dSP;
84 int count = 0;
85 aio_req req, prv;
86
87 pthread_mutex_lock (&reslock);
88
89 {
90 /* read any signals sent by the worker threads */
91 char buf [32];
92 while (read (respipe [0], buf, 32) > 0)
93 ;
94 }
95
96 req = ress;
97 ress = rese = 0;
98
99 pthread_mutex_unlock (&reslock);
100
101 while (req)
102 {
103 nreqs--;
104
105 if (req->type == REQ_QUIT)
106 started--;
107 else
108 {
109 int errorno = errno;
110 errno = req->errorno;
111
112 if (req->type == REQ_READ)
113 SvCUR_set (req->data, req->dataoffset
114 + req->result > 0 ? req->result : 0);
115
116 if (req->data)
117 SvREFCNT_dec (req->data);
118
119 if (req->fh)
120 SvREFCNT_dec (req->fh);
121
122 if (req->type == REQ_STAT || req->type == REQ_LSTAT || req->type == REQ_FSTAT)
123 {
124 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
125 PL_laststatval = req->result;
126 PL_statcache = *(req->statdata);
127
128 Safefree (req->statdata);
129 }
130
131 ENTER;
132 PUSHMARK (SP);
133 XPUSHs (sv_2mortal (newSViv (req->result)));
134
135 if (req->type == REQ_OPEN)
136 {
137 /* convert fd to fh */
138 SV *fh;
139
140 PUTBACK;
141 call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL);
142 SPAGAIN;
143
144 fh = SvREFCNT_inc (POPs);
145
146 PUSHMARK (SP);
147 XPUSHs (sv_2mortal (fh));
148 }
149
150 if (SvOK (req->callback))
151 {
152 PUTBACK;
153 call_sv (req->callback, G_VOID | G_EVAL);
154 SPAGAIN;
155 }
156
157 LEAVE;
158
159 if (req->callback)
160 SvREFCNT_dec (req->callback);
161
162 errno = errorno;
163 count++;
164 }
165
166 prv = req;
167 req = req->next;
168 Safefree (prv);
169
170 /* TODO: croak on errors? */
171 }
172
173 return count;
174 }
175
176 static void *aio_proc(void *arg);
177
178 static void
179 start_thread (void)
180 {
181 sigset_t fullsigset, oldsigset;
182 pthread_t tid;
183 pthread_attr_t attr;
184
185 pthread_attr_init (&attr);
186 pthread_attr_setstacksize (&attr, STACKSIZE);
187 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
188
189 sigfillset (&fullsigset);
190 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
191
192 if (pthread_create (&tid, &attr, aio_proc, 0) == 0)
193 started++;
194
195 sigprocmask (SIG_SETMASK, &oldsigset, 0);
196 }
197
198 static void
199 send_req (aio_req req)
200 {
201 nreqs++;
202
203 pthread_mutex_lock (&reqlock);
204
205 req->next = 0;
206
207 if (reqe)
208 {
209 reqe->next = req;
210 reqe = req;
211 }
212 else
213 reqe = reqs = req;
214
215 pthread_cond_signal (&reqwait);
216 pthread_mutex_unlock (&reqlock);
217
218 while (nreqs > max_outstanding)
219 {
220 poll_wait ();
221 poll_cb ();
222 }
223 }
224
225 static void
226 end_thread (void)
227 {
228 aio_req req;
229 New (0, req, 1, aio_cb);
230 req->type = REQ_QUIT;
231
232 send_req (req);
233 }
234
235 static void *
236 aio_proc (void *thr_arg)
237 {
238 aio_req req;
239 int type;
240
241 do
242 {
243 pthread_mutex_lock (&reqlock);
244
245 for (;;)
246 {
247 req = reqs;
248
249 if (reqs)
250 {
251 reqs = reqs->next;
252 if (!reqs) reqe = 0;
253 }
254
255 if (req)
256 break;
257
258 pthread_cond_wait (&reqwait, &reqlock);
259 }
260
261 pthread_mutex_unlock (&reqlock);
262
263 errno = 0; /* strictly unnecessary */
264
265 type = req->type;
266
267 switch (type)
268 {
269 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
270 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
271 #if SYS_readahead
272 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
273 #else
274 case REQ_READAHEAD: req->result = -1; errno = ENOSYS; break;
275 #endif
276
277 case REQ_STAT: req->result = stat (req->dataptr, req->statdata); break;
278 case REQ_LSTAT: req->result = lstat (req->dataptr, req->statdata); break;
279 case REQ_FSTAT: req->result = fstat (req->fd , req->statdata); break;
280
281 case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break;
282 case REQ_CLOSE: req->result = close (req->fd); break;
283 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
284
285 case REQ_FSYNC: req->result = fsync (req->fd); break;
286 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
287
288 case REQ_QUIT:
289 break;
290
291 default:
292 req->result = ENOSYS;
293 break;
294 }
295
296 req->errorno = errno;
297
298 pthread_mutex_lock (&reslock);
299
300 req->next = 0;
301
302 if (rese)
303 {
304 rese->next = req;
305 rese = req;
306 }
307 else
308 {
309 rese = ress = req;
310
311 /* write a dummy byte to the pipe so fh becomes ready */
312 write (respipe [1], &respipe, 1);
313 }
314
315 pthread_mutex_unlock (&reslock);
316 }
317 while (type != REQ_QUIT);
318
319 return 0;
320 }
321
322 MODULE = IO::AIO PACKAGE = IO::AIO
323
324 PROTOTYPES: ENABLE
325
326 BOOT:
327 {
328 if (pipe (respipe))
329 croak ("unable to initialize result pipe");
330
331 if (fcntl (respipe [0], F_SETFL, O_NONBLOCK))
332 croak ("cannot set result pipe to nonblocking mode");
333
334 if (fcntl (respipe [1], F_SETFL, O_NONBLOCK))
335 croak ("cannot set result pipe to nonblocking mode");
336 }
337
338 void
339 min_parallel(nthreads)
340 int nthreads
341 PROTOTYPE: $
342 CODE:
343 while (nthreads > started)
344 start_thread ();
345
346 void
347 max_parallel(nthreads)
348 int nthreads
349 PROTOTYPE: $
350 CODE:
351 {
352 int cur = started;
353 while (cur > nthreads)
354 {
355 end_thread ();
356 cur--;
357 }
358
359 while (started > nthreads)
360 {
361 poll_wait ();
362 poll_cb ();
363 }
364 }
365
366 int
367 max_outstanding(nreqs)
368 int nreqs
369 PROTOTYPE: $
370 CODE:
371 RETVAL = max_outstanding;
372 max_outstanding = nreqs;
373
374 void
375 aio_open(pathname,flags,mode,callback=&PL_sv_undef)
376 SV * pathname
377 int flags
378 int mode
379 SV * callback
380 PROTOTYPE: $$$;$
381 CODE:
382 {
383 aio_req req;
384
385 Newz (0, req, 1, aio_cb);
386
387 if (!req)
388 croak ("out of memory during aio_req allocation");
389
390 req->type = REQ_OPEN;
391 req->data = newSVsv (pathname);
392 req->dataptr = SvPV_nolen (req->data);
393 req->fd = flags;
394 req->mode = mode;
395 req->callback = SvREFCNT_inc (callback);
396
397 send_req (req);
398 }
399
400 void
401 aio_close(fh,callback=&PL_sv_undef)
402 SV * fh
403 SV * callback
404 PROTOTYPE: $;$
405 ALIAS:
406 aio_close = REQ_CLOSE
407 aio_fsync = REQ_FSYNC
408 aio_fdatasync = REQ_FDATASYNC
409 CODE:
410 {
411 aio_req req;
412
413 Newz (0, req, 1, aio_cb);
414
415 if (!req)
416 croak ("out of memory during aio_req allocation");
417
418 req->type = ix;
419 req->fh = newSVsv (fh);
420 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
421 req->callback = SvREFCNT_inc (callback);
422
423 send_req (req);
424 }
425
426 void
427 aio_read(fh,offset,length,data,dataoffset,callback=&PL_sv_undef)
428 SV * fh
429 UV offset
430 IV length
431 SV * data
432 IV dataoffset
433 SV * callback
434 ALIAS:
435 aio_read = REQ_READ
436 aio_write = REQ_WRITE
437 PROTOTYPE: $$$$$;$
438 CODE:
439 {
440 aio_req req;
441 STRLEN svlen;
442 char *svptr = SvPV (data, svlen);
443
444 SvUPGRADE (data, SVt_PV);
445 SvPOK_on (data);
446
447 if (dataoffset < 0)
448 dataoffset += svlen;
449
450 if (dataoffset < 0 || dataoffset > svlen)
451 croak ("data offset outside of string");
452
453 if (ix == REQ_WRITE)
454 {
455 /* write: check length and adjust. */
456 if (length < 0 || length + dataoffset > svlen)
457 length = svlen - dataoffset;
458 }
459 else
460 {
461 /* read: grow scalar as necessary */
462 svptr = SvGROW (data, length + dataoffset);
463 }
464
465 if (length < 0)
466 croak ("length must not be negative");
467
468 Newz (0, req, 1, aio_cb);
469
470 if (!req)
471 croak ("out of memory during aio_req allocation");
472
473 req->type = ix;
474 req->fh = newSVsv (fh);
475 req->fd = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh))
476 : IoOFP (sv_2io (fh)));
477 req->offset = offset;
478 req->length = length;
479 req->data = SvREFCNT_inc (data);
480 req->dataptr = (char *)svptr + dataoffset;
481 req->callback = SvREFCNT_inc (callback);
482
483 send_req (req);
484 }
485
486 void
487 aio_readahead(fh,offset,length,callback=&PL_sv_undef)
488 SV * fh
489 UV offset
490 IV length
491 SV * callback
492 PROTOTYPE: $$$;$
493 CODE:
494 {
495 aio_req req;
496
497 if (length < 0)
498 croak ("length must not be negative");
499
500 Newz (0, req, 1, aio_cb);
501
502 if (!req)
503 croak ("out of memory during aio_req allocation");
504
505 req->type = REQ_READAHEAD;
506 req->fh = newSVsv (fh);
507 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh)));
508 req->offset = offset;
509 req->length = length;
510 req->callback = SvREFCNT_inc (callback);
511
512 send_req (req);
513 }
514
515 void
516 aio_stat(fh_or_path,callback=&PL_sv_undef)
517 SV * fh_or_path
518 SV * callback
519 ALIAS:
520 aio_stat = REQ_STAT
521 aio_lstat = REQ_LSTAT
522 CODE:
523 {
524 aio_req req;
525
526 Newz (0, req, 1, aio_cb);
527
528 if (!req)
529 croak ("out of memory during aio_req allocation");
530
531 New (0, req->statdata, 1, Stat_t);
532
533 if (!req->statdata)
534 croak ("out of memory during aio_req->statdata allocation");
535
536 if (SvPOK (fh_or_path))
537 {
538 req->type = ix;
539 req->data = newSVsv (fh_or_path);
540 req->dataptr = SvPV_nolen (req->data);
541 }
542 else
543 {
544 req->type = REQ_FSTAT;
545 req->fh = newSVsv (fh_or_path);
546 req->fd = PerlIO_fileno (IoIFP (sv_2io (fh_or_path)));
547 }
548
549 req->callback = SvREFCNT_inc (callback);
550
551 send_req (req);
552 }
553
554 void
555 aio_unlink(pathname,callback=&PL_sv_undef)
556 SV * pathname
557 SV * callback
558 CODE:
559 {
560 aio_req req;
561
562 Newz (0, req, 1, aio_cb);
563
564 if (!req)
565 croak ("out of memory during aio_req allocation");
566
567 req->type = REQ_UNLINK;
568 req->data = newSVsv (pathname);
569 req->dataptr = SvPV_nolen (req->data);
570 req->callback = SvREFCNT_inc (callback);
571
572 send_req (req);
573 }
574
575 void
576 flush()
577 PROTOTYPE:
578 CODE:
579 while (nreqs)
580 {
581 poll_wait ();
582 poll_cb ();
583 }
584
585 void
586 poll()
587 PROTOTYPE:
588 CODE:
589 if (nreqs)
590 {
591 poll_wait ();
592 poll_cb ();
593 }
594
595 int
596 poll_fileno()
597 PROTOTYPE:
598 CODE:
599 RETVAL = respipe [0];
600 OUTPUT:
601 RETVAL
602
603 int
604 poll_cb(...)
605 PROTOTYPE:
606 CODE:
607 RETVAL = poll_cb ();
608 OUTPUT:
609 RETVAL
610
611 void
612 poll_wait()
613 PROTOTYPE:
614 CODE:
615 if (nreqs)
616 poll_wait ();
617
618 int
619 nreqs()
620 PROTOTYPE:
621 CODE:
622 RETVAL = nreqs;
623 OUTPUT:
624 RETVAL
625