ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.79 by root, Thu Oct 26 16:28:33 2006 UTC vs.
Revision 1.80 by root, Fri Oct 27 19:17:23 2006 UTC

145}; 145};
146 146
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 147static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 148
149static unsigned int started, wanted; 149static unsigned int started, wanted;
150static volatile unsigned int nreqs, nready, npending;
151static volatile unsigned int max_outstanding = 0xffffffff;
152static int respipe [2];
153 150
154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
156#else 153#else
157# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
198 wrk->prev->next = wrk->next; 195 wrk->prev->next = wrk->next;
199 196
200 free (wrk); 197 free (wrk);
201} 198}
202 199
200static volatile unsigned int nreqs, nready, npending;
201static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2];
203
203static pthread_mutex_t reslock = AIO_MUTEX_INIT; 204static pthread_mutex_t reslock = AIO_MUTEX_INIT;
204static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 205static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
205static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 206static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
207
208#if WORDREAD_UNSAFE
209
210static unsigned int get_nready ()
211{
212 unsigned int retval;
213
214 LOCK (reqlock);
215 retval = nready;
216 UNLOCK (reqlock);
217
218 return retval;
219}
220
221static unsigned int get_npending ()
222{
223 unsigned int retval;
224
225 LOCK (reslock);
226 retval = npending;
227 UNLOCK (reslock);
228
229 return retval;
230}
231
232#else
233
234# define get_nready() nready
235# define get_npending() npending
236
237#endif
206 238
207/* 239/*
208 * a somewhat faster data structure might be nice, but 240 * a somewhat faster data structure might be nice, but
209 * with 8 priorities this actually needs <20 insns 241 * with 8 priorities this actually needs <20 insns
210 * per shift, the most expensive operation. 242 * per shift, the most expensive operation.
331 req_invoke (grp); 363 req_invoke (grp);
332 req_free (grp); 364 req_free (grp);
333 } 365 }
334} 366}
335 367
336static void poll_wait ()
337{
338 fd_set rfd;
339
340 while (nreqs)
341 {
342 int size;
343 if (WORDREAD_UNSAFE) LOCK (reslock);
344 size = res_queue.size;
345 if (WORDREAD_UNSAFE) UNLOCK (reslock);
346
347 if (size)
348 return;
349
350 FD_ZERO(&rfd);
351 FD_SET(respipe [0], &rfd);
352
353 select (respipe [0] + 1, &rfd, 0, 0, 0);
354 }
355}
356
357static void req_invoke (aio_req req) 368static void req_invoke (aio_req req)
358{ 369{
359 dSP; 370 dSP;
360 371
361 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback)) 372 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback))
504 req->flags |= FLAG_CANCELLED; 515 req->flags |= FLAG_CANCELLED;
505 516
506 req_cancel_subs (req); 517 req_cancel_subs (req);
507} 518}
508 519
520static void *aio_proc(void *arg);
521
522static void start_thread (void)
523{
524 sigset_t fullsigset, oldsigset;
525 pthread_attr_t attr;
526
527 worker *wrk = calloc (1, sizeof (worker));
528
529 if (!wrk)
530 croak ("unable to allocate worker thread data");
531
532 pthread_attr_init (&attr);
533 pthread_attr_setstacksize (&attr, STACKSIZE);
534 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
535#ifdef PTHREAD_SCOPE_PROCESS
536 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
537#endif
538
539 sigfillset (&fullsigset);
540
541 LOCK (wrklock);
542 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
543
544 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
545 {
546 wrk->prev = &wrk_first;
547 wrk->next = wrk_first.next;
548 wrk_first.next->prev = wrk;
549 wrk_first.next = wrk;
550 ++started;
551 }
552 else
553 free (wrk);
554
555 sigprocmask (SIG_SETMASK, &oldsigset, 0);
556 UNLOCK (wrklock);
557}
558
559static void maybe_start_thread ()
560{
561#if 0
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted)
567 return;
568
569 if (nready <= nreqs - get_nready () - get_npending ())
570 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589
590 start_thread ();
591}
592
593static void req_send (aio_req req)
594{
595 ++nreqs;
596
597 LOCK (reqlock);
598 ++nready;
599 reqq_push (&req_queue, req);
600 pthread_cond_signal (&reqwait);
601 UNLOCK (reqlock);
602
603 maybe_start_thread ();
604}
605
606static void end_thread (void)
607{
608 aio_req req;
609
610 Newz (0, req, 1, aio_cb);
611
612 req->type = REQ_QUIT;
613 req->pri = PRI_MAX + PRI_BIAS;
614
615 req_send (req);
616
617 LOCK (wrklock);
618 --started;
619 UNLOCK (wrklock);
620}
621
622static void min_parallel (int nthreads)
623{
624 if (wanted < nthreads)
625 wanted = nthreads;
626}
627
628static void max_parallel (int nthreads)
629{
630 if (wanted > nthreads)
631 wanted = nthreads;
632
633 while (started > wanted)
634 end_thread ();
635}
636
637static void poll_wait ()
638{
639 fd_set rfd;
640
641 while (nreqs)
642 {
643 int size;
644 if (WORDREAD_UNSAFE) LOCK (reslock);
645 size = res_queue.size;
646 if (WORDREAD_UNSAFE) UNLOCK (reslock);
647
648 if (size)
649 return;
650
651 maybe_start_thread ();
652
653 FD_ZERO(&rfd);
654 FD_SET(respipe [0], &rfd);
655
656 select (respipe [0] + 1, &rfd, 0, 0, 0);
657 }
658}
659
509static int poll_cb (int max) 660static int poll_cb (int max)
510{ 661{
511 dSP; 662 dSP;
512 int count = 0; 663 int count = 0;
513 int do_croak = 0; 664 int do_croak = 0;
515 666
516 for (;;) 667 for (;;)
517 { 668 {
518 while (max <= 0 || count < max) 669 while (max <= 0 || count < max)
519 { 670 {
671 maybe_start_thread ();
672
520 LOCK (reslock); 673 LOCK (reslock);
521 req = reqq_shift (&res_queue); 674 req = reqq_shift (&res_queue);
522 675
523 if (req) 676 if (req)
524 { 677 {
538 if (!req) 691 if (!req)
539 break; 692 break;
540 693
541 --nreqs; 694 --nreqs;
542 695
543 if (req->type == REQ_QUIT)
544 --started;
545 else if (req->type == REQ_GROUP && req->length) 696 if (req->type == REQ_GROUP && req->length)
546 { 697 {
547 req->fd = 1; /* mark request as delayed */ 698 req->fd = 1; /* mark request as delayed */
548 continue; 699 continue;
549 } 700 }
550 else 701 else
577 728
578 max = 0; 729 max = 0;
579 } 730 }
580 731
581 return count; 732 return count;
582}
583
584static void *aio_proc(void *arg);
585
586static void start_thread (void)
587{
588 sigset_t fullsigset, oldsigset;
589 pthread_attr_t attr;
590
591 worker *wrk = calloc (1, sizeof (worker));
592
593 if (!wrk)
594 croak ("unable to allocate worker thread data");
595
596 pthread_attr_init (&attr);
597 pthread_attr_setstacksize (&attr, STACKSIZE);
598 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
599
600 sigfillset (&fullsigset);
601
602 LOCK (wrklock);
603 sigprocmask (SIG_SETMASK, &fullsigset, &oldsigset);
604
605 if (pthread_create (&wrk->tid, &attr, aio_proc, (void *)wrk) == 0)
606 {
607 wrk->prev = &wrk_first;
608 wrk->next = wrk_first.next;
609 wrk_first.next->prev = wrk;
610 wrk_first.next = wrk;
611 ++started;
612 }
613 else
614 free (wrk);
615
616 sigprocmask (SIG_SETMASK, &oldsigset, 0);
617 UNLOCK (wrklock);
618}
619
620static void req_send (aio_req req)
621{
622 while (started < wanted && nreqs >= started)
623 start_thread ();
624
625 ++nreqs;
626
627 LOCK (reqlock);
628 ++nready;
629 reqq_push (&req_queue, req);
630 pthread_cond_signal (&reqwait);
631 UNLOCK (reqlock);
632}
633
634static void end_thread (void)
635{
636 aio_req req;
637
638 Newz (0, req, 1, aio_cb);
639
640 req->type = REQ_QUIT;
641 req->pri = PRI_MAX + PRI_BIAS;
642
643 req_send (req);
644}
645
646static void min_parallel (int nthreads)
647{
648 if (wanted < nthreads)
649 wanted = nthreads;
650}
651
652static void max_parallel (int nthreads)
653{
654 int cur = started;
655
656 if (wanted > nthreads)
657 wanted = nthreads;
658
659 while (cur > wanted)
660 {
661 end_thread ();
662 cur--;
663 }
664
665 while (started > wanted)
666 {
667 poll_wait ();
668 poll_cb (0);
669 }
670} 733}
671 734
672static void create_pipe () 735static void create_pipe ()
673{ 736{
674 if (pipe (respipe)) 737 if (pipe (respipe))
938/*****************************************************************************/ 1001/*****************************************************************************/
939 1002
940static void *aio_proc (void *thr_arg) 1003static void *aio_proc (void *thr_arg)
941{ 1004{
942 aio_req req; 1005 aio_req req;
943 int type;
944 worker *self = (worker *)thr_arg; 1006 worker *self = (worker *)thr_arg;
945 1007
946 do 1008 for (;;)
947 { 1009 {
948 LOCK (reqlock); 1010 LOCK (reqlock);
949 1011
950 for (;;) 1012 for (;;)
951 { 1013 {
960 --nready; 1022 --nready;
961 1023
962 UNLOCK (reqlock); 1024 UNLOCK (reqlock);
963 1025
964 errno = 0; /* strictly unnecessary */ 1026 errno = 0; /* strictly unnecessary */
965 type = req->type; /* remember type for QUIT check */
966 1027
967 if (!(req->flags & FLAG_CANCELLED)) 1028 if (!(req->flags & FLAG_CANCELLED))
968 switch (type) 1029 switch (req->type)
969 { 1030 {
970 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break; 1031 case REQ_READ: req->result = pread (req->fd, req->dataptr, req->length, req->offset); break;
971 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break; 1032 case REQ_WRITE: req->result = pwrite (req->fd, req->dataptr, req->length, req->offset); break;
972 1033
973 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break; 1034 case REQ_READAHEAD: req->result = readahead (req->fd, req->offset, req->length); break;
999 req->result = select (0, 0, 0, 0, &tv); 1060 req->result = select (0, 0, 0, 0, &tv);
1000 } 1061 }
1001 1062
1002 case REQ_GROUP: 1063 case REQ_GROUP:
1003 case REQ_NOP: 1064 case REQ_NOP:
1065 break;
1066
1004 case REQ_QUIT: 1067 case REQ_QUIT:
1068 LOCK (wrklock);
1069 worker_free (self);
1070 --started;
1071 UNLOCK (wrklock);
1005 break; 1072 return 0;
1006 1073
1007 default: 1074 default:
1008 req->result = ENOSYS; 1075 req->result = ENOSYS;
1009 break; 1076 break;
1010 } 1077 }
1022 self->req = 0; 1089 self->req = 0;
1023 worker_clear (self); 1090 worker_clear (self);
1024 1091
1025 UNLOCK (reslock); 1092 UNLOCK (reslock);
1026 } 1093 }
1027 while (type != REQ_QUIT);
1028
1029 LOCK (wrklock);
1030 worker_free (self);
1031 UNLOCK (wrklock);
1032
1033 return 0;
1034} 1094}
1035 1095
1036/*****************************************************************************/ 1096/*****************************************************************************/
1037 1097
1038static void atfork_prepare (void) 1098static void atfork_prepare (void)
1124 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1184 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1125 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1185 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1126 1186
1127 create_pipe (); 1187 create_pipe ();
1128 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1188 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1189
1190 start_thread ();
1129} 1191}
1130 1192
1131void 1193void
1132min_parallel (int nthreads) 1194min_parallel (int nthreads)
1133 PROTOTYPE: $ 1195 PROTOTYPE: $
1491 1553
1492int 1554int
1493nready() 1555nready()
1494 PROTOTYPE: 1556 PROTOTYPE:
1495 CODE: 1557 CODE:
1496 if (WORDREAD_UNSAFE) LOCK (reqlock);
1497 RETVAL = nready; 1558 RETVAL = get_nready ();
1498 if (WORDREAD_UNSAFE) UNLOCK (reqlock);
1499 OUTPUT: 1559 OUTPUT:
1500 RETVAL 1560 RETVAL
1501 1561
1502int 1562int
1503npending() 1563npending()
1504 PROTOTYPE: 1564 PROTOTYPE:
1505 CODE: 1565 CODE:
1506 if (WORDREAD_UNSAFE) LOCK (reslock);
1507 RETVAL = npending; 1566 RETVAL = get_npending ();
1508 if (WORDREAD_UNSAFE) UNLOCK (reslock);
1509 OUTPUT: 1567 OUTPUT:
1510 RETVAL 1568 RETVAL
1511 1569
1512PROTOTYPES: DISABLE 1570PROTOTYPES: DISABLE
1513 1571

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines