ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.82 by root, Fri Oct 27 20:11:58 2006 UTC vs.
Revision 1.85 by root, Sat Oct 28 23:32:29 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
65 68
66/* wether word reads are potentially non-atomic. 69/* wether word reads are potentially non-atomic.
67 * this is conservatice, likely most arches this runs 70 * this is conservatice, likely most arches this runs
68 * on have atomic word read/writes. 71 * on have atomic word read/writes.
69 */ 72 */
70#ifndef WORDREAD_UNSAFE 73#ifndef WORDACCESS_UNSAFE
71# if __i386 || __x86_64 74# if __i386 || __x86_64
72# define WORDREAD_UNSAFE 0 75# define WORDACCESS_UNSAFE 0
73# else 76# else
74# define WORDREAD_UNSAFE 1 77# define WORDACCESS_UNSAFE 1
75# endif 78# endif
76#endif 79#endif
77 80
78/* buffer size for various temporary buffers */ 81/* buffer size for various temporary buffers */
79#define AIO_BUFSIZE 65536 82#define AIO_BUFSIZE 65536
142 DEFAULT_PRI = 0, 145 DEFAULT_PRI = 0,
143 PRI_BIAS = -PRI_MIN, 146 PRI_BIAS = -PRI_MIN,
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
150#define AIO_TICKS ((1000000 + 1023) >> 10)
151
152static unsigned int max_poll_time = 0;
153static unsigned int max_poll_reqs = 0;
154
155/* calculcate time difference in ~1/AIO_TICKS of a second */
156static int tvdiff (struct timeval *tv1, struct timeval *tv2)
157{
158 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
159 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
160}
161
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 162static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 163
149static unsigned int started, wanted; 164static unsigned int started, idle, wanted;
150 165
151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 166#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 167# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
153#else 168#else
154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 169# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
196 211
197 free (wrk); 212 free (wrk);
198} 213}
199 214
200static volatile unsigned int nreqs, nready, npending; 215static volatile unsigned int nreqs, nready, npending;
216static volatile unsigned int max_idle = 4;
201static volatile unsigned int max_outstanding = 0xffffffff; 217static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2]; 218static int respipe [2];
203 219
204static pthread_mutex_t reslock = AIO_MUTEX_INIT; 220static pthread_mutex_t reslock = AIO_MUTEX_INIT;
205static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 221static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
206static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 222static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
207 223
208#if WORDREAD_UNSAFE 224#if WORDACCESS_UNSAFE
209 225
210static unsigned int get_nready () 226static unsigned int get_nready ()
211{ 227{
212 unsigned int retval; 228 unsigned int retval;
213 229
227 UNLOCK (reslock); 243 UNLOCK (reslock);
228 244
229 return retval; 245 return retval;
230} 246}
231 247
248static unsigned int get_nthreads ()
249{
250 unsigned int retval;
251
252 LOCK (wrklock);
253 retval = started;
254 UNLOCK (wrklock);
255
256 return retval;
257}
258
232#else 259#else
233 260
234# define get_nready() nready 261# define get_nready() nready
235# define get_npending() npending 262# define get_npending() npending
263# define get_nthreads() started
236 264
237#endif 265#endif
238 266
239/* 267/*
240 * a somewhat faster data structure might be nice, but 268 * a somewhat faster data structure might be nice, but
288 } 316 }
289 317
290 abort (); 318 abort ();
291} 319}
292 320
293static int poll_cb (int max); 321static int poll_cb ();
294static void req_invoke (aio_req req); 322static void req_invoke (aio_req req);
295static void req_free (aio_req req); 323static void req_free (aio_req req);
296static void req_cancel (aio_req req); 324static void req_cancel (aio_req req);
297 325
298/* must be called at most once */ 326/* must be called at most once */
556 UNLOCK (wrklock); 584 UNLOCK (wrklock);
557} 585}
558 586
559static void maybe_start_thread () 587static void maybe_start_thread ()
560{ 588{
561#if 0 589 if (get_nthreads () >= wanted)
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted)
567 return; 590 return;
568 591
569 if (nready <= nreqs - get_nready () - get_npending ()) 592 /* todo: maybe use idle here, but might be less exact */
593 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
570 return; 594 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589 595
590 start_thread (); 596 start_thread ();
591} 597}
592 598
593static void req_send (aio_req req) 599static void req_send (aio_req req)
610 Newz (0, req, 1, aio_cb); 616 Newz (0, req, 1, aio_cb);
611 617
612 req->type = REQ_QUIT; 618 req->type = REQ_QUIT;
613 req->pri = PRI_MAX + PRI_BIAS; 619 req->pri = PRI_MAX + PRI_BIAS;
614 620
615 req_send (req); 621 LOCK (reqlock);
622 reqq_push (&req_queue, req);
623 pthread_cond_signal (&reqwait);
624 UNLOCK (reqlock);
616 625
617 LOCK (wrklock); 626 LOCK (wrklock);
618 --started; 627 --started;
619 UNLOCK (wrklock); 628 UNLOCK (wrklock);
620} 629}
621 630
631static void set_max_idle (int nthreads)
632{
633 if (WORDACCESS_UNSAFE) LOCK (reqlock);
634 max_idle = nthreads <= 0 ? 1 : nthreads;
635 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
636}
637
622static void min_parallel (int nthreads) 638static void min_parallel (int nthreads)
623{ 639{
624 if (wanted < nthreads) 640 if (wanted < nthreads)
625 wanted = nthreads; 641 wanted = nthreads;
626} 642}
639 fd_set rfd; 655 fd_set rfd;
640 656
641 while (nreqs) 657 while (nreqs)
642 { 658 {
643 int size; 659 int size;
644 if (WORDREAD_UNSAFE) LOCK (reslock); 660 if (WORDACCESS_UNSAFE) LOCK (reslock);
645 size = res_queue.size; 661 size = res_queue.size;
646 if (WORDREAD_UNSAFE) UNLOCK (reslock); 662 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
647 663
648 if (size) 664 if (size)
649 return; 665 return;
650 666
651 maybe_start_thread (); 667 maybe_start_thread ();
655 671
656 select (respipe [0] + 1, &rfd, 0, 0, 0); 672 select (respipe [0] + 1, &rfd, 0, 0, 0);
657 } 673 }
658} 674}
659 675
660static int poll_cb (int max) 676static int poll_cb ()
661{ 677{
662 dSP; 678 dSP;
663 int count = 0; 679 int count = 0;
680 int maxreqs = max_poll_reqs;
664 int do_croak = 0; 681 int do_croak = 0;
682 struct timeval tv_start, tv_now;
665 aio_req req; 683 aio_req req;
666 684
685 if (max_poll_time)
686 gettimeofday (&tv_start, 0);
687
667 for (;;) 688 for (;;)
668 { 689 {
669 while (max <= 0 || count < max) 690 for (;;)
670 { 691 {
671 maybe_start_thread (); 692 maybe_start_thread ();
672 693
673 LOCK (reslock); 694 LOCK (reslock);
674 req = reqq_shift (&res_queue); 695 req = reqq_shift (&res_queue);
717 738
718 count++; 739 count++;
719 } 740 }
720 741
721 req_free (req); 742 req_free (req);
743
744 if (maxreqs && !--maxreqs)
745 break;
746
747 if (max_poll_time)
748 {
749 gettimeofday (&tv_now, 0);
750
751 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
752 break;
753 }
722 } 754 }
723 755
724 if (nreqs <= max_outstanding) 756 if (nreqs <= max_outstanding)
725 break; 757 break;
726 758
727 poll_wait (); 759 poll_wait ();
728 760
729 max = 0; 761 ++maxreqs;
730 } 762 }
731 763
732 return count; 764 return count;
733} 765}
734 766
1001/*****************************************************************************/ 1033/*****************************************************************************/
1002 1034
1003static void *aio_proc (void *thr_arg) 1035static void *aio_proc (void *thr_arg)
1004{ 1036{
1005 aio_req req; 1037 aio_req req;
1038 struct timespec ts;
1006 worker *self = (worker *)thr_arg; 1039 worker *self = (worker *)thr_arg;
1007 1040
1041 /* try to distribute timeouts somewhat evenly */
1042 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
1043 * (1000000000UL / 1024UL);
1044
1008 for (;;) 1045 for (;;)
1009 { 1046 {
1047 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1048
1010 LOCK (reqlock); 1049 LOCK (reqlock);
1011 1050
1012 for (;;) 1051 for (;;)
1013 { 1052 {
1014 self->req = req = reqq_shift (&req_queue); 1053 self->req = req = reqq_shift (&req_queue);
1015 1054
1016 if (req) 1055 if (req)
1017 break; 1056 break;
1018 1057
1058 ++idle;
1059
1060 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1061 == ETIMEDOUT)
1062 {
1063 if (idle > max_idle)
1064 {
1065 --idle;
1066 UNLOCK (reqlock);
1067 LOCK (wrklock);
1068 --started;
1069 UNLOCK (wrklock);
1070 goto quit;
1071 }
1072
1073 /* we are allowed to idle, so do so without any timeout */
1019 pthread_cond_wait (&reqwait, &reqlock); 1074 pthread_cond_wait (&reqwait, &reqlock);
1075 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1076 }
1077
1078 --idle;
1020 } 1079 }
1021 1080
1022 --nready; 1081 --nready;
1023 1082
1024 UNLOCK (reqlock); 1083 UNLOCK (reqlock);
1064 case REQ_GROUP: 1123 case REQ_GROUP:
1065 case REQ_NOP: 1124 case REQ_NOP:
1066 break; 1125 break;
1067 1126
1068 case REQ_QUIT: 1127 case REQ_QUIT:
1069 LOCK (wrklock); 1128 goto quit;
1070 worker_free (self);
1071 --started;
1072 UNLOCK (wrklock);
1073 return 0;
1074 1129
1075 default: 1130 default:
1076 req->result = ENOSYS; 1131 req->result = ENOSYS;
1077 break; 1132 break;
1078 } 1133 }
1090 self->req = 0; 1145 self->req = 0;
1091 worker_clear (self); 1146 worker_clear (self);
1092 1147
1093 UNLOCK (reslock); 1148 UNLOCK (reslock);
1094 } 1149 }
1150
1151quit:
1152 LOCK (wrklock);
1153 worker_free (self);
1154 UNLOCK (wrklock);
1155
1156 return 0;
1095} 1157}
1096 1158
1097/*****************************************************************************/ 1159/*****************************************************************************/
1098 1160
1099static void atfork_prepare (void) 1161static void atfork_prepare (void)
1141 1203
1142 worker_clear (wrk); 1204 worker_clear (wrk);
1143 worker_free (wrk); 1205 worker_free (wrk);
1144 } 1206 }
1145 1207
1146 started = 0; 1208 started = 0;
1209 idle = 0;
1147 nreqs = 0; 1210 nreqs = 0;
1211 nready = 0;
1212 npending = 0;
1148 1213
1149 close (respipe [0]); 1214 close (respipe [0]);
1150 close (respipe [1]); 1215 close (respipe [1]);
1151 create_pipe (); 1216 create_pipe ();
1152 1217
1189 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1254 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1190 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1255 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1191 1256
1192 create_pipe (); 1257 create_pipe ();
1193 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1258 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1194
1195 start_thread ();
1196} 1259}
1260
1261void
1262max_poll_reqs (int nreqs)
1263 PROTOTYPE: $
1264 CODE:
1265 max_poll_reqs = nreqs;
1266
1267void
1268max_poll_time (double nseconds)
1269 PROTOTYPE: $
1270 CODE:
1271 max_poll_time = nseconds * AIO_TICKS;
1197 1272
1198void 1273void
1199min_parallel (int nthreads) 1274min_parallel (int nthreads)
1200 PROTOTYPE: $ 1275 PROTOTYPE: $
1201 1276
1202void 1277void
1203max_parallel (int nthreads) 1278max_parallel (int nthreads)
1204 PROTOTYPE: $ 1279 PROTOTYPE: $
1280
1281void
1282max_idle (int nthreads)
1283 PROTOTYPE: $
1284 CODE:
1285 set_max_idle (nthreads);
1205 1286
1206int 1287int
1207max_outstanding (int maxreqs) 1288max_outstanding (int maxreqs)
1208 PROTOTYPE: $ 1289 PROTOTYPE: $
1209 CODE: 1290 CODE:
1546 1627
1547int 1628int
1548poll_cb(...) 1629poll_cb(...)
1549 PROTOTYPE: 1630 PROTOTYPE:
1550 CODE: 1631 CODE:
1551 RETVAL = poll_cb (0); 1632 RETVAL = poll_cb ();
1552 OUTPUT:
1553 RETVAL
1554
1555int
1556poll_some(int max = 0)
1557 PROTOTYPE: $
1558 CODE:
1559 RETVAL = poll_cb (max);
1560 OUTPUT: 1633 OUTPUT:
1561 RETVAL 1634 RETVAL
1562 1635
1563void 1636void
1564poll_wait() 1637poll_wait()
1589 CODE: 1662 CODE:
1590 RETVAL = get_npending (); 1663 RETVAL = get_npending ();
1591 OUTPUT: 1664 OUTPUT:
1592 RETVAL 1665 RETVAL
1593 1666
1667int
1668nthreads()
1669 PROTOTYPE:
1670 CODE:
1671 if (WORDACCESS_UNSAFE) LOCK (wrklock);
1672 RETVAL = started;
1673 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
1674 OUTPUT:
1675 RETVAL
1676
1594PROTOTYPES: DISABLE 1677PROTOTYPES: DISABLE
1595 1678
1596MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1679MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1597 1680
1598void 1681void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines