ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.83 by root, Sat Oct 28 00:17:30 2006 UTC vs.
Revision 1.85 by root, Sat Oct 28 23:32:29 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
65 68
66/* wether word reads are potentially non-atomic. 69/* wether word reads are potentially non-atomic.
67 * this is conservatice, likely most arches this runs 70 * this is conservatice, likely most arches this runs
68 * on have atomic word read/writes. 71 * on have atomic word read/writes.
69 */ 72 */
70#ifndef WORDREAD_UNSAFE 73#ifndef WORDACCESS_UNSAFE
71# if __i386 || __x86_64 74# if __i386 || __x86_64
72# define WORDREAD_UNSAFE 0 75# define WORDACCESS_UNSAFE 0
73# else 76# else
74# define WORDREAD_UNSAFE 1 77# define WORDACCESS_UNSAFE 1
75# endif 78# endif
76#endif 79#endif
77 80
78/* buffer size for various temporary buffers */ 81/* buffer size for various temporary buffers */
79#define AIO_BUFSIZE 65536 82#define AIO_BUFSIZE 65536
142 DEFAULT_PRI = 0, 145 DEFAULT_PRI = 0,
143 PRI_BIAS = -PRI_MIN, 146 PRI_BIAS = -PRI_MIN,
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
150#define AIO_TICKS ((1000000 + 1023) >> 10)
151
152static unsigned int max_poll_time = 0;
153static unsigned int max_poll_reqs = 0;
154
155/* calculcate time difference in ~1/AIO_TICKS of a second */
156static int tvdiff (struct timeval *tv1, struct timeval *tv2)
157{
158 return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS
159 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
160}
161
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 162static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 163
149static unsigned int started, wanted; 164static unsigned int started, idle, wanted;
150 165
151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 166#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 167# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
153#else 168#else
154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 169# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
196 211
197 free (wrk); 212 free (wrk);
198} 213}
199 214
200static volatile unsigned int nreqs, nready, npending; 215static volatile unsigned int nreqs, nready, npending;
216static volatile unsigned int max_idle = 4;
201static volatile unsigned int max_outstanding = 0xffffffff; 217static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2]; 218static int respipe [2];
203 219
204static pthread_mutex_t reslock = AIO_MUTEX_INIT; 220static pthread_mutex_t reslock = AIO_MUTEX_INIT;
205static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 221static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
206static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; 222static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER;
207 223
208#if WORDREAD_UNSAFE 224#if WORDACCESS_UNSAFE
209 225
210static unsigned int get_nready () 226static unsigned int get_nready ()
211{ 227{
212 unsigned int retval; 228 unsigned int retval;
213 229
227 UNLOCK (reslock); 243 UNLOCK (reslock);
228 244
229 return retval; 245 return retval;
230} 246}
231 247
248static unsigned int get_nthreads ()
249{
250 unsigned int retval;
251
252 LOCK (wrklock);
253 retval = started;
254 UNLOCK (wrklock);
255
256 return retval;
257}
258
232#else 259#else
233 260
234# define get_nready() nready 261# define get_nready() nready
235# define get_npending() npending 262# define get_npending() npending
263# define get_nthreads() started
236 264
237#endif 265#endif
238 266
239/* 267/*
240 * a somewhat faster data structure might be nice, but 268 * a somewhat faster data structure might be nice, but
288 } 316 }
289 317
290 abort (); 318 abort ();
291} 319}
292 320
293static int poll_cb (int max); 321static int poll_cb ();
294static void req_invoke (aio_req req); 322static void req_invoke (aio_req req);
295static void req_free (aio_req req); 323static void req_free (aio_req req);
296static void req_cancel (aio_req req); 324static void req_cancel (aio_req req);
297 325
298/* must be called at most once */ 326/* must be called at most once */
556 UNLOCK (wrklock); 584 UNLOCK (wrklock);
557} 585}
558 586
559static void maybe_start_thread () 587static void maybe_start_thread ()
560{ 588{
561#if 0 589 if (get_nthreads () >= wanted)
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted)
567 return; 590 return;
568 591
569 if (nready <= nreqs - get_nready () - get_npending ()) 592 /* todo: maybe use idle here, but might be less exact */
593 if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs)
570 return; 594 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589 595
590 start_thread (); 596 start_thread ();
591} 597}
592 598
593static void req_send (aio_req req) 599static void req_send (aio_req req)
620 LOCK (wrklock); 626 LOCK (wrklock);
621 --started; 627 --started;
622 UNLOCK (wrklock); 628 UNLOCK (wrklock);
623} 629}
624 630
631static void set_max_idle (int nthreads)
632{
633 if (WORDACCESS_UNSAFE) LOCK (reqlock);
634 max_idle = nthreads <= 0 ? 1 : nthreads;
635 if (WORDACCESS_UNSAFE) UNLOCK (reqlock);
636}
637
625static void min_parallel (int nthreads) 638static void min_parallel (int nthreads)
626{ 639{
627 if (wanted < nthreads) 640 if (wanted < nthreads)
628 wanted = nthreads; 641 wanted = nthreads;
629} 642}
642 fd_set rfd; 655 fd_set rfd;
643 656
644 while (nreqs) 657 while (nreqs)
645 { 658 {
646 int size; 659 int size;
647 if (WORDREAD_UNSAFE) LOCK (reslock); 660 if (WORDACCESS_UNSAFE) LOCK (reslock);
648 size = res_queue.size; 661 size = res_queue.size;
649 if (WORDREAD_UNSAFE) UNLOCK (reslock); 662 if (WORDACCESS_UNSAFE) UNLOCK (reslock);
650 663
651 if (size) 664 if (size)
652 return; 665 return;
653 666
654 maybe_start_thread (); 667 maybe_start_thread ();
658 671
659 select (respipe [0] + 1, &rfd, 0, 0, 0); 672 select (respipe [0] + 1, &rfd, 0, 0, 0);
660 } 673 }
661} 674}
662 675
663static int poll_cb (int max) 676static int poll_cb ()
664{ 677{
665 dSP; 678 dSP;
666 int count = 0; 679 int count = 0;
680 int maxreqs = max_poll_reqs;
667 int do_croak = 0; 681 int do_croak = 0;
682 struct timeval tv_start, tv_now;
668 aio_req req; 683 aio_req req;
669 684
685 if (max_poll_time)
686 gettimeofday (&tv_start, 0);
687
670 for (;;) 688 for (;;)
671 { 689 {
672 while (max <= 0 || count < max) 690 for (;;)
673 { 691 {
674 maybe_start_thread (); 692 maybe_start_thread ();
675 693
676 LOCK (reslock); 694 LOCK (reslock);
677 req = reqq_shift (&res_queue); 695 req = reqq_shift (&res_queue);
720 738
721 count++; 739 count++;
722 } 740 }
723 741
724 req_free (req); 742 req_free (req);
743
744 if (maxreqs && !--maxreqs)
745 break;
746
747 if (max_poll_time)
748 {
749 gettimeofday (&tv_now, 0);
750
751 if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
752 break;
753 }
725 } 754 }
726 755
727 if (nreqs <= max_outstanding) 756 if (nreqs <= max_outstanding)
728 break; 757 break;
729 758
730 poll_wait (); 759 poll_wait ();
731 760
732 max = 0; 761 ++maxreqs;
733 } 762 }
734 763
735 return count; 764 return count;
736} 765}
737 766
1004/*****************************************************************************/ 1033/*****************************************************************************/
1005 1034
1006static void *aio_proc (void *thr_arg) 1035static void *aio_proc (void *thr_arg)
1007{ 1036{
1008 aio_req req; 1037 aio_req req;
1038 struct timespec ts;
1009 worker *self = (worker *)thr_arg; 1039 worker *self = (worker *)thr_arg;
1010 1040
1041 /* try to distribute timeouts somewhat evenly */
1042 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
1043 * (1000000000UL / 1024UL);
1044
1011 for (;;) 1045 for (;;)
1012 { 1046 {
1047 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1048
1013 LOCK (reqlock); 1049 LOCK (reqlock);
1014 1050
1015 for (;;) 1051 for (;;)
1016 { 1052 {
1017 self->req = req = reqq_shift (&req_queue); 1053 self->req = req = reqq_shift (&req_queue);
1018 1054
1019 if (req) 1055 if (req)
1020 break; 1056 break;
1021 1057
1058 ++idle;
1059
1060 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1061 == ETIMEDOUT)
1062 {
1063 if (idle > max_idle)
1064 {
1065 --idle;
1066 UNLOCK (reqlock);
1067 LOCK (wrklock);
1068 --started;
1069 UNLOCK (wrklock);
1070 goto quit;
1071 }
1072
1073 /* we are allowed to idle, so do so without any timeout */
1022 pthread_cond_wait (&reqwait, &reqlock); 1074 pthread_cond_wait (&reqwait, &reqlock);
1075 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1076 }
1077
1078 --idle;
1023 } 1079 }
1024 1080
1025 --nready; 1081 --nready;
1026 1082
1027 UNLOCK (reqlock); 1083 UNLOCK (reqlock);
1067 case REQ_GROUP: 1123 case REQ_GROUP:
1068 case REQ_NOP: 1124 case REQ_NOP:
1069 break; 1125 break;
1070 1126
1071 case REQ_QUIT: 1127 case REQ_QUIT:
1072 LOCK (wrklock); 1128 goto quit;
1073 worker_free (self);
1074 --started;
1075 UNLOCK (wrklock);
1076 return 0;
1077 1129
1078 default: 1130 default:
1079 req->result = ENOSYS; 1131 req->result = ENOSYS;
1080 break; 1132 break;
1081 } 1133 }
1093 self->req = 0; 1145 self->req = 0;
1094 worker_clear (self); 1146 worker_clear (self);
1095 1147
1096 UNLOCK (reslock); 1148 UNLOCK (reslock);
1097 } 1149 }
1150
1151quit:
1152 LOCK (wrklock);
1153 worker_free (self);
1154 UNLOCK (wrklock);
1155
1156 return 0;
1098} 1157}
1099 1158
1100/*****************************************************************************/ 1159/*****************************************************************************/
1101 1160
1102static void atfork_prepare (void) 1161static void atfork_prepare (void)
1144 1203
1145 worker_clear (wrk); 1204 worker_clear (wrk);
1146 worker_free (wrk); 1205 worker_free (wrk);
1147 } 1206 }
1148 1207
1149 started = 0; 1208 started = 0;
1209 idle = 0;
1150 nreqs = 0; 1210 nreqs = 0;
1211 nready = 0;
1212 npending = 0;
1151 1213
1152 close (respipe [0]); 1214 close (respipe [0]);
1153 close (respipe [1]); 1215 close (respipe [1]);
1154 create_pipe (); 1216 create_pipe ();
1155 1217
1192 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1254 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1193 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1255 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1194 1256
1195 create_pipe (); 1257 create_pipe ();
1196 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1258 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1197
1198 start_thread ();
1199} 1259}
1260
1261void
1262max_poll_reqs (int nreqs)
1263 PROTOTYPE: $
1264 CODE:
1265 max_poll_reqs = nreqs;
1266
1267void
1268max_poll_time (double nseconds)
1269 PROTOTYPE: $
1270 CODE:
1271 max_poll_time = nseconds * AIO_TICKS;
1200 1272
1201void 1273void
1202min_parallel (int nthreads) 1274min_parallel (int nthreads)
1203 PROTOTYPE: $ 1275 PROTOTYPE: $
1204 1276
1205void 1277void
1206max_parallel (int nthreads) 1278max_parallel (int nthreads)
1207 PROTOTYPE: $ 1279 PROTOTYPE: $
1280
1281void
1282max_idle (int nthreads)
1283 PROTOTYPE: $
1284 CODE:
1285 set_max_idle (nthreads);
1208 1286
1209int 1287int
1210max_outstanding (int maxreqs) 1288max_outstanding (int maxreqs)
1211 PROTOTYPE: $ 1289 PROTOTYPE: $
1212 CODE: 1290 CODE:
1549 1627
1550int 1628int
1551poll_cb(...) 1629poll_cb(...)
1552 PROTOTYPE: 1630 PROTOTYPE:
1553 CODE: 1631 CODE:
1554 RETVAL = poll_cb (0); 1632 RETVAL = poll_cb ();
1555 OUTPUT:
1556 RETVAL
1557
1558int
1559poll_some(int max = 0)
1560 PROTOTYPE: $
1561 CODE:
1562 RETVAL = poll_cb (max);
1563 OUTPUT: 1633 OUTPUT:
1564 RETVAL 1634 RETVAL
1565 1635
1566void 1636void
1567poll_wait() 1637poll_wait()
1592 CODE: 1662 CODE:
1593 RETVAL = get_npending (); 1663 RETVAL = get_npending ();
1594 OUTPUT: 1664 OUTPUT:
1595 RETVAL 1665 RETVAL
1596 1666
1667int
1668nthreads()
1669 PROTOTYPE:
1670 CODE:
1671 if (WORDACCESS_UNSAFE) LOCK (wrklock);
1672 RETVAL = started;
1673 if (WORDACCESS_UNSAFE) UNLOCK (wrklock);
1674 OUTPUT:
1675 RETVAL
1676
1597PROTOTYPES: DISABLE 1677PROTOTYPES: DISABLE
1598 1678
1599MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1679MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1600 1680
1601void 1681void

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines