ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.79 by root, Thu Oct 26 16:28:33 2006 UTC vs.
Revision 1.81 by root, Fri Oct 27 20:10:06 2006 UTC

92 REQ_READ, REQ_WRITE, REQ_READAHEAD, 92 REQ_READ, REQ_WRITE, REQ_READAHEAD,
93 REQ_SENDFILE, 93 REQ_SENDFILE,
94 REQ_STAT, REQ_LSTAT, REQ_FSTAT, 94 REQ_STAT, REQ_LSTAT, REQ_FSTAT,
95 REQ_FSYNC, REQ_FDATASYNC, 95 REQ_FSYNC, REQ_FDATASYNC,
96 REQ_UNLINK, REQ_RMDIR, REQ_RENAME, 96 REQ_UNLINK, REQ_RMDIR, REQ_RENAME,
97 REQ_READDIR, 97 REQ_MKNOD, REQ_READDIR,
98 REQ_LINK, REQ_SYMLINK, 98 REQ_LINK, REQ_SYMLINK,
99 REQ_GROUP, REQ_NOP, 99 REQ_GROUP, REQ_NOP,
100 REQ_BUSY, 100 REQ_BUSY,
101}; 101};
102 102
145}; 145};
146 146
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 147static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 148
149static unsigned int started, wanted; 149static unsigned int started, wanted;
150static volatile unsigned int nreqs, nready, npending;
151static volatile unsigned int max_outstanding = 0xffffffff;
152static int respipe [2];
153 150
154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
156#else 153#else
157# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
198 wrk->prev->next = wrk->next; 195 wrk->prev->next = wrk->next;
199 196
200 free (wrk); 197 free (wrk);
201} 198}
202 199
200static volatile unsigned int nreqs, nready, npending;
201static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2];
203
203static pthread_mutex_t reslock = AIO_MUTEX_INIT; 204static pthread_mutex_t reslock = AIO_MUTEX_INIT;
204static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 205static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
205static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 206static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
207
208#if WORDREAD_UNSAFE
209
210static unsigned int get_nready ()
211{
212 unsigned int retval;
213
214 LOCK (reqlock);
215 retval = nready;
216 UNLOCK (reqlock);
217
218 return retval;
219}
220
221static unsigned int get_npending ()
222{
223 unsigned int retval;
224
225 LOCK (reslock);
226 retval = npending;
227 UNLOCK (reslock);
228
229 return retval;
230}
231
232#else
233
234# define get_nready() nready
235# define get_npending() npending
236
237#endif
206 238
207/* 239/*
208 * a somewhat faster data structure might be nice, but 240 * a somewhat faster data structure might be nice, but
209 * with 8 priorities this actually needs <20 insns 241 * with 8 priorities this actually needs <20 insns
210 * per shift, the most expensive operation. 242 * per shift, the most expensive operation.
331 req_invoke (grp); 363 req_invoke (grp);
332 req_free (grp); 364 req_free (grp);
333 } 365 }
334} 366}
335 367
336static void poll_wait ()
337{
338 fd_set rfd;
339
340 while (nreqs)
341 {
342 int size;
343 if (WORDREAD_UNSAFE) LOCK (reslock);
344 size = res_queue.size;
345 if (WORDREAD_UNSAFE) UNLOCK (reslock);
346
347 if (size)
348 return;
349
350 FD_ZERO(&rfd);
351 FD_SET(respipe [0], &rfd);
352
353 select (respipe [0] + 1, &rfd, 0, 0, 0);
354 }
355}
356
357static void req_invoke (aio_req req) 368static void req_invoke (aio_req req)
358{ 369{
359 dSP; 370 dSP;
360 371
361 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback)) 372 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback))
504 req->flags |= FLAG_CANCELLED; 515 req->flags |= FLAG_CANCELLED;
505 516
506 req_cancel_subs (req); 517 req_cancel_subs (req);
507} 518}
508 519
520static void *aio_proc(void *arg);
521
522static void start_thread (void)
523{
524 sigset_t fullsigset, oldsigset;
525 pthread_attr_t attr;
526
527 worker *wrk = calloc (1, sizeof (worker));
528
529 if (!wrk)
530 croak ("unable to allocate worker thread data");
531
532 pthread_attr_init (&attr);
533 pthread_attr_setstacksize (&attr, STACKSIZE);
534 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
535#ifdef PTHREAD_SCOPE_PROCESS
536 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
537#endif
538
539 sigfillset (&fullsigset);
540
541 LOCK (wrklock);
542 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
543
544 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
545 {
546 wrk->prev = &wrk_first;
547 wrk->next = wrk_first.next;
548 wrk_first.next->prev = wrk;
549 wrk_first.next = wrk;
550 ++started;
551 }
552 else
553 free (wrk);
554
555 sigprocmask (SIG_SETMASK, &oldsigset, 0);
556 UNLOCK (wrklock);
557}
558
559static void maybe_start_thread ()
560{
561#if 0
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted)
567 return;
568
569 if (nready <= nreqs - get_nready () - get_npending ())
570 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589
590 start_thread ();
591}
592
593static void req_send (aio_req req)
594{
595 ++nreqs;
596
597 LOCK (reqlock);
598 ++nready;
599 reqq_push (&req_queue, req);
600 pthread_cond_signal (&reqwait);
601 UNLOCK (reqlock);
602
603 maybe_start_thread ();
604}
605
606static void end_thread (void)
607{
608 aio_req req;
609
610 Newz (0, req, 1, aio_cb);
611
612 req->type = REQ_QUIT;
613 req->pri = PRI_MAX + PRI_BIAS;
614
615 req_send (req);
616
617 LOCK (wrklock);
618 --started;
619 UNLOCK (wrklock);
620}
621
622static void min_parallel (int nthreads)
623{
624 if (wanted < nthreads)
625 wanted = nthreads;
626}
627
628static void max_parallel (int nthreads)
629{
630 if (wanted > nthreads)
631 wanted = nthreads;
632
633 while (started > wanted)
634 end_thread ();
635}
636
637static void poll_wait ()
638{
639 fd_set rfd;
640
641 while (nreqs)
642 {
643 int size;
644 if (WORDREAD_UNSAFE) LOCK (reslock);
645 size = res_queue.size;
646 if (WORDREAD_UNSAFE) UNLOCK (reslock);
647
648 if (size)
649 return;
650
651 maybe_start_thread ();
652
653 FD_ZERO(&rfd);
654 FD_SET(respipe [0], &rfd);
655
656 select (respipe [0] + 1, &rfd, 0, 0, 0);
657 }
658}
659
509static int poll_cb (int max) 660static int poll_cb (int max)
510{ 661{
511 dSP; 662 dSP;
512 int count = 0; 663 int count = 0;
513 int do_croak = 0; 664 int do_croak = 0;
515 666
516 for (;;) 667 for (;;)
517 { 668 {
518 while (max <= 0 || count < max) 669 while (max <= 0 || count < max)
519 { 670 {
671 maybe_start_thread ();
672
520 LOCK (reslock); 673 LOCK (reslock);
521 req = reqq_shift (&res_queue); 674 req = reqq_shift (&res_queue);
522 675
523 if (req) 676 if (req)
524 { 677 {
538 if (!req) 691 if (!req)
539 break; 692 break;
540 693
541 --nreqs; 694 --nreqs;
542 695
543 if (req->type == REQ_QUIT)
544 --started;
545 else if (req->type == REQ_GROUP && req->length) 696 if (req->type == REQ_GROUP && req->length)
546 { 697 {
547 req->fd = 1; /* mark request as delayed */ 698 req->fd = 1; /* mark request as delayed */
548 continue; 699 continue;
549 } 700 }
550 else 701 else
577 728
578 max = 0; 729 max = 0;
579 } 730 }
580 731
581 return count; 732 return count;
582}
583
584static void *aio_proc(void *arg);
585
586static void start_thread (void)
587{
588 sigset_t fullsigset, oldsigset;
589 pthread_attr_t attr;
590
591 worker *wrk = calloc (1, sizeof (worker));
592
593 if (!wrk)
594 croak ("unable to allocate worker thread data");
595
596 pthread_attr_init (&attr);
597 pthread_attr_setstacksize (&attr, STACKSIZE);
598 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
599
600 sigfillset (&fullsigset);
601
602 LOCK (wrklock);
603 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
604
605 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
606 {
607 wrk->prev = &wrk_first;
608 wrk->next = wrk_first.next;
609 wrk_first.next->prev = wrk;
610 wrk_first.next = wrk;
611 ++started;
612 }
613 else
614 free (wrk);
615
616 sigprocmask (SIG_SETMASK, &oldsigset, 0);
617 UNLOCK (wrklock);
618}
619
620static void req_send (aio_req req)
621{
622 while (started < wanted && nreqs >= started)
623 start_thread ();
624
625 ++nreqs;
626
627 LOCK (reqlock);
628 ++nready;
629 reqq_push (&req_queue, req);
630 pthread_cond_signal (&reqwait);
631 UNLOCK (reqlock);
632}
633
634static void end_thread (void)
635{
636 aio_req req;
637
638 Newz (0, req, 1, aio_cb);
639
640 req->type = REQ_QUIT;
641 req->pri = PRI_MAX + PRI_BIAS;
642
643 req_send (req);
644}
645
646static void min_parallel (int nthreads)
647{
648 if (wanted < nthreads)
649 wanted = nthreads;
650}
651
652static void max_parallel (int nthreads)
653{
654 int cur = started;
655
656 if (wanted > nthreads)
657 wanted = nthreads;
658
659 while (cur > wanted)
660 {
661 end_thread ();
662 cur--;
663 }
664
665 while (started > wanted)
666 {
667 poll_wait ();
668 poll_cb (0);
669 }
670} 733}
671 734
672static void create_pipe () 735static void create_pipe ()
673{ 736{
674 if (pipe (respipe)) 737 if (pipe (respipe))
938/*****************************************************************************/ 1001/*****************************************************************************/
939 1002
940static void *aio_proc (void *thr_arg) 1003static void *aio_proc (void *thr_arg)
941{ 1004{
942 aio_req req; 1005 aio_req req;
943 int type;
944 worker *self = (worker *)thr_arg; 1006 worker *self = (worker *)thr_arg;
945 1007
946 do 1008 for (;;)
947 { 1009 {
948 LOCK (reqlock); 1010 LOCK (reqlock);
949 1011
950 for (;;) 1012 for (;;)
951 { 1013 {
960 --nready; 1022 --nready;
961 1023
962 UNLOCK (reqlock); 1024 UNLOCK (reqlock);
963 1025
964 errno = 0; /* strictly unnecessary */ 1026 errno = 0; /* strictly unnecessary */
965 type = req->type; /* remember type for QUIT check */
966 1027
967 if (!(req->flags & FLAG_CANCELLED)) 1028 if (!(req->flags & FLAG_CANCELLED))
968 switch (type) 1029 switch (req->type)
969 { 1030 {
970 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 1031 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
971 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 1032 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
972 1033
973 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 1034 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
982 case REQ_UNLINK: req->result = unlink (req->dataptr); break; 1043 case REQ_UNLINK: req->result = unlink (req->dataptr); break;
983 case REQ_RMDIR: req->result = rmdir (req->dataptr); break; 1044 case REQ_RMDIR: req->result = rmdir (req->dataptr); break;
984 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; 1045 case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break;
985 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; 1046 case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break;
986 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; 1047 case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break;
1048 case REQ_MKNOD: req->result = mknod (req->data2ptr, req->mode, (dev_t)req->offset); break;
987 1049
988 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; 1050 case REQ_FDATASYNC: req->result = fdatasync (req->fd); break;
989 case REQ_FSYNC: req->result = fsync (req->fd); break; 1051 case REQ_FSYNC: req->result = fsync (req->fd); break;
990 case REQ_READDIR: scandir_ (req, self); break; 1052 case REQ_READDIR: scandir_ (req, self); break;
991 1053
999 req->result = select (0, 0, 0, 0, &tv); 1061 req->result = select (0, 0, 0, 0, &tv);
1000 } 1062 }
1001 1063
1002 case REQ_GROUP: 1064 case REQ_GROUP:
1003 case REQ_NOP: 1065 case REQ_NOP:
1066 break;
1067
1004 case REQ_QUIT: 1068 case REQ_QUIT:
1069 LOCK (wrklock);
1070 worker_free (self);
1071 --started;
1072 UNLOCK (wrklock);
1005 break; 1073 return 0;
1006 1074
1007 default: 1075 default:
1008 req->result = ENOSYS; 1076 req->result = ENOSYS;
1009 break; 1077 break;
1010 } 1078 }
1022 self->req = 0; 1090 self->req = 0;
1023 worker_clear (self); 1091 worker_clear (self);
1024 1092
1025 UNLOCK (reslock); 1093 UNLOCK (reslock);
1026 } 1094 }
1027 while (type != REQ_QUIT);
1028
1029 LOCK (wrklock);
1030 worker_free (self);
1031 UNLOCK (wrklock);
1032
1033 return 0;
1034} 1095}
1035 1096
1036/*****************************************************************************/ 1097/*****************************************************************************/
1037 1098
1038static void atfork_prepare (void) 1099static void atfork_prepare (void)
1121{ 1182{
1122 HV *stash = gv_stashpv ("IO::AIO", 1); 1183 HV *stash = gv_stashpv ("IO::AIO", 1);
1123 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1184 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1124 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1185 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1125 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1186 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1187 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1188 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1126 1189
1127 create_pipe (); 1190 create_pipe ();
1128 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1191 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1192
1193 start_thread ();
1129} 1194}
1130 1195
1131void 1196void
1132min_parallel (int nthreads) 1197min_parallel (int nthreads)
1133 PROTOTYPE: $ 1198 PROTOTYPE: $
1361 req->type = ix; 1426 req->type = ix;
1362 req->fh = newSVsv (oldpath); 1427 req->fh = newSVsv (oldpath);
1363 req->data2ptr = SvPVbyte_nolen (req->fh); 1428 req->data2ptr = SvPVbyte_nolen (req->fh);
1364 req->data = newSVsv (newpath); 1429 req->data = newSVsv (newpath);
1365 req->dataptr = SvPVbyte_nolen (req->data); 1430 req->dataptr = SvPVbyte_nolen (req->data);
1431
1432 REQ_SEND;
1433}
1434
1435void
1436aio_mknod (pathname,mode,dev,callback=&PL_sv_undef)
1437 SV * pathname
1438 SV * callback
1439 UV mode
1440 UV dev
1441 PPCODE:
1442{
1443 dREQ;
1444
1445 req->type = REQ_MKNOD;
1446 req->data = newSVsv (pathname);
1447 req->dataptr = SvPVbyte_nolen (req->data);
1448 req->mode = (mode_t)mode;
1449 req->offset = dev;
1366 1450
1367 REQ_SEND; 1451 REQ_SEND;
1368} 1452}
1369 1453
1370void 1454void
1491 1575
1492int 1576int
1493nready() 1577nready()
1494 PROTOTYPE: 1578 PROTOTYPE:
1495 CODE: 1579 CODE:
1496 if (WORDREAD_UNSAFE) LOCK (reqlock);
1497 RETVAL = nready; 1580 RETVAL = get_nready ();
1498 if (WORDREAD_UNSAFE) UNLOCK (reqlock);
1499 OUTPUT: 1581 OUTPUT:
1500 RETVAL 1582 RETVAL
1501 1583
1502int 1584int
1503npending() 1585npending()
1504 PROTOTYPE: 1586 PROTOTYPE:
1505 CODE: 1587 CODE:
1506 if (WORDREAD_UNSAFE) LOCK (reslock);
1507 RETVAL = npending; 1588 RETVAL = get_npending ();
1508 if (WORDREAD_UNSAFE) UNLOCK (reslock);
1509 OUTPUT: 1589 OUTPUT:
1510 RETVAL 1590 RETVAL
1511 1591
1512PROTOTYPES: DISABLE 1592PROTOTYPES: DISABLE
1513 1593

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines