ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.83 by root, Sat Oct 28 00:17:30 2006 UTC vs.
Revision 1.84 by root, Sat Oct 28 01:24:19 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 150static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 151
149static unsigned int started, wanted; 152static unsigned int started, idle, wanted;
150 153
151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
153#else 156#else
154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 157# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
196 199
197 free (wrk); 200 free (wrk);
198} 201}
199 202
200static volatile unsigned int nreqs, nready, npending; 203static volatile unsigned int nreqs, nready, npending;
204static volatile unsigned int max_idle = 4;
201static volatile unsigned int max_outstanding = 0xffffffff; 205static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2]; 206static int respipe [2];
203 207
204static pthread_mutex_t reslock = AIO_MUTEX_INIT; 208static pthread_mutex_t reslock = AIO_MUTEX_INIT;
205static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 209static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
556 UNLOCK (wrklock); 560 UNLOCK (wrklock);
557} 561}
558 562
559static void maybe_start_thread () 563static void maybe_start_thread ()
560{ 564{
561#if 0
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted) 565 if (started >= wanted)
567 return; 566 return;
568 567
568 /* todo: maybe use idle here, but might be less exact */
569 if (nready <= nreqs - get_nready () - get_npending ()) 569 if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ()))
570 return; 570 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589 571
590 start_thread (); 572 start_thread ();
591} 573}
592 574
593static void req_send (aio_req req) 575static void req_send (aio_req req)
1004/*****************************************************************************/ 986/*****************************************************************************/
1005 987
1006static void *aio_proc (void *thr_arg) 988static void *aio_proc (void *thr_arg)
1007{ 989{
1008 aio_req req; 990 aio_req req;
991 struct timespec ts;
1009 worker *self = (worker *)thr_arg; 992 worker *self = (worker *)thr_arg;
1010 993
994 /* try to distribute timeouts somewhat evenly */
995 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
996 * (1000000000UL / 1024UL);
997
1011 for (;;) 998 for (;;)
1012 { 999 {
1000 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1001
1013 LOCK (reqlock); 1002 LOCK (reqlock);
1014 1003
1015 for (;;) 1004 for (;;)
1016 { 1005 {
1017 self->req = req = reqq_shift (&req_queue); 1006 self->req = req = reqq_shift (&req_queue);
1018 1007
1019 if (req) 1008 if (req)
1020 break; 1009 break;
1021 1010
1011 ++idle;
1012
1013 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1014 == ETIMEDOUT)
1015 {
1016 if (idle > max_idle)
1017 {
1018 --idle;
1019 UNLOCK (reqlock);
1020 LOCK (wrklock);
1021 --started;
1022 UNLOCK (wrklock);
1023 goto quit;
1024 }
1025
1026 /* we are allowed to idle, so do so without any timeout */
1022 pthread_cond_wait (&reqwait, &reqlock); 1027 pthread_cond_wait (&reqwait, &reqlock);
1028 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1029 }
1030
1031 --idle;
1023 } 1032 }
1024 1033
1025 --nready; 1034 --nready;
1026 1035
1027 UNLOCK (reqlock); 1036 UNLOCK (reqlock);
1067 case REQ_GROUP: 1076 case REQ_GROUP:
1068 case REQ_NOP: 1077 case REQ_NOP:
1069 break; 1078 break;
1070 1079
1071 case REQ_QUIT: 1080 case REQ_QUIT:
1072 LOCK (wrklock); 1081 goto quit;
1073 worker_free (self);
1074 --started;
1075 UNLOCK (wrklock);
1076 return 0;
1077 1082
1078 default: 1083 default:
1079 req->result = ENOSYS; 1084 req->result = ENOSYS;
1080 break; 1085 break;
1081 } 1086 }
1093 self->req = 0; 1098 self->req = 0;
1094 worker_clear (self); 1099 worker_clear (self);
1095 1100
1096 UNLOCK (reslock); 1101 UNLOCK (reslock);
1097 } 1102 }
1103
1104quit:
1105 LOCK (wrklock);
1106 worker_free (self);
1107 UNLOCK (wrklock);
1108
1109 return 0;
1098} 1110}
1099 1111
1100/*****************************************************************************/ 1112/*****************************************************************************/
1101 1113
1102static void atfork_prepare (void) 1114static void atfork_prepare (void)
1144 1156
1145 worker_clear (wrk); 1157 worker_clear (wrk);
1146 worker_free (wrk); 1158 worker_free (wrk);
1147 } 1159 }
1148 1160
1149 started = 0; 1161 started = 0;
1162 idle = 0;
1150 nreqs = 0; 1163 nreqs = 0;
1164 nready = 0;
1165 npending = 0;
1151 1166
1152 close (respipe [0]); 1167 close (respipe [0]);
1153 close (respipe [1]); 1168 close (respipe [1]);
1154 create_pipe (); 1169 create_pipe ();
1155 1170
1192 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1207 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1193 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); 1208 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1194 1209
1195 create_pipe (); 1210 create_pipe ();
1196 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1211 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1197
1198 start_thread ();
1199} 1212}
1200 1213
1201void 1214void
1202min_parallel (int nthreads) 1215min_parallel (int nthreads)
1203 PROTOTYPE: $ 1216 PROTOTYPE: $

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines