ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.81 by root, Fri Oct 27 20:10:06 2006 UTC vs.
Revision 1.84 by root, Sat Oct 28 01:24:19 2006 UTC

43# else 43# else
44# error sendfile support requested but not available 44# error sendfile support requested but not available
45# endif 45# endif
46#endif 46#endif
47 47
48/* number of seconds after which idle threads exit */
49#define IDLE_TIMEOUT 10
50
48/* used for struct dirent, AIX doesn't provide it */ 51/* used for struct dirent, AIX doesn't provide it */
49#ifndef NAME_MAX 52#ifndef NAME_MAX
50# define NAME_MAX 4096 53# define NAME_MAX 4096
51#endif 54#endif
52 55
144 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 147 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
145}; 148};
146 149
147static int next_pri = DEFAULT_PRI + PRI_BIAS; 150static int next_pri = DEFAULT_PRI + PRI_BIAS;
148 151
149static unsigned int started, wanted; 152static unsigned int started, idle, wanted;
150 153
151#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
152# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
153#else 156#else
154# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER 157# define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
196 199
197 free (wrk); 200 free (wrk);
198} 201}
199 202
200static volatile unsigned int nreqs, nready, npending; 203static volatile unsigned int nreqs, nready, npending;
204static volatile unsigned int max_idle = 4;
201static volatile unsigned int max_outstanding = 0xffffffff; 205static volatile unsigned int max_outstanding = 0xffffffff;
202static int respipe [2]; 206static int respipe [2];
203 207
204static pthread_mutex_t reslock = AIO_MUTEX_INIT; 208static pthread_mutex_t reslock = AIO_MUTEX_INIT;
205static pthread_mutex_t reqlock = AIO_MUTEX_INIT; 209static pthread_mutex_t reqlock = AIO_MUTEX_INIT;
556 UNLOCK (wrklock); 560 UNLOCK (wrklock);
557} 561}
558 562
559static void maybe_start_thread () 563static void maybe_start_thread ()
560{ 564{
561#if 0
562 static struct timeval last;
563 struct timeval diff, now;
564#endif
565
566 if (started >= wanted) 565 if (started >= wanted)
567 return; 566 return;
568 567
568 /* todo: maybe use idle here, but might be less exact */
569 if (nready <= nreqs - get_nready () - get_npending ()) 569 if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ()))
570 return; 570 return;
571
572#if 0
573 gettimeofday (&now, 0);
574
575 diff.tv_sec = now.tv_sec - last.tv_sec;
576 diff.tv_usec = now.tv_usec - last.tv_usec;
577
578 if (diff.tv_usec < 0)
579 {
580 --diff.tv_sec;
581 diff.tv_usec += 1000000;
582 }
583
584 if (!diff.tv_sec && diff.tv_usec < 10000)
585 return;
586
587 last = now;
588#endif
589 571
590 start_thread (); 572 start_thread ();
591} 573}
592 574
593static void req_send (aio_req req) 575static void req_send (aio_req req)
610 Newz (0, req, 1, aio_cb); 592 Newz (0, req, 1, aio_cb);
611 593
612 req->type = REQ_QUIT; 594 req->type = REQ_QUIT;
613 req->pri = PRI_MAX + PRI_BIAS; 595 req->pri = PRI_MAX + PRI_BIAS;
614 596
615 req_send (req); 597 LOCK (reqlock);
598 reqq_push (&req_queue, req);
599 pthread_cond_signal (&reqwait);
600 UNLOCK (reqlock);
616 601
617 LOCK (wrklock); 602 LOCK (wrklock);
618 --started; 603 --started;
619 UNLOCK (wrklock); 604 UNLOCK (wrklock);
620} 605}
1001/*****************************************************************************/ 986/*****************************************************************************/
1002 987
1003static void *aio_proc (void *thr_arg) 988static void *aio_proc (void *thr_arg)
1004{ 989{
1005 aio_req req; 990 aio_req req;
991 struct timespec ts;
1006 worker *self = (worker *)thr_arg; 992 worker *self = (worker *)thr_arg;
1007 993
994 /* try to distribute timeouts somewhat evenly */
995 ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL)
996 * (1000000000UL / 1024UL);
997
1008 for (;;) 998 for (;;)
1009 { 999 {
1000 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1001
1010 LOCK (reqlock); 1002 LOCK (reqlock);
1011 1003
1012 for (;;) 1004 for (;;)
1013 { 1005 {
1014 self->req = req = reqq_shift (&req_queue); 1006 self->req = req = reqq_shift (&req_queue);
1015 1007
1016 if (req) 1008 if (req)
1017 break; 1009 break;
1018 1010
1011 ++idle;
1012
1013 if (pthread_cond_timedwait (&reqwait, &reqlock, &ts)
1014 == ETIMEDOUT)
1015 {
1016 if (idle > max_idle)
1017 {
1018 --idle;
1019 UNLOCK (reqlock);
1020 LOCK (wrklock);
1021 --started;
1022 UNLOCK (wrklock);
1023 goto quit;
1024 }
1025
1026 /* we are allowed to idle, so do so without any timeout */
1019 pthread_cond_wait (&reqwait, &reqlock); 1027 pthread_cond_wait (&reqwait, &reqlock);
1028 ts.tv_sec = time (0) + IDLE_TIMEOUT;
1029 }
1030
1031 --idle;
1020 } 1032 }
1021 1033
1022 --nready; 1034 --nready;
1023 1035
1024 UNLOCK (reqlock); 1036 UNLOCK (reqlock);
1064 case REQ_GROUP: 1076 case REQ_GROUP:
1065 case REQ_NOP: 1077 case REQ_NOP:
1066 break; 1078 break;
1067 1079
1068 case REQ_QUIT: 1080 case REQ_QUIT:
1069 LOCK (wrklock); 1081 goto quit;
1070 worker_free (self);
1071 --started;
1072 UNLOCK (wrklock);
1073 return 0;
1074 1082
1075 default: 1083 default:
1076 req->result = ENOSYS; 1084 req->result = ENOSYS;
1077 break; 1085 break;
1078 } 1086 }
1090 self->req = 0; 1098 self->req = 0;
1091 worker_clear (self); 1099 worker_clear (self);
1092 1100
1093 UNLOCK (reslock); 1101 UNLOCK (reslock);
1094 } 1102 }
1103
1104quit:
1105 LOCK (wrklock);
1106 worker_free (self);
1107 UNLOCK (wrklock);
1108
1109 return 0;
1095} 1110}
1096 1111
1097/*****************************************************************************/ 1112/*****************************************************************************/
1098 1113
1099static void atfork_prepare (void) 1114static void atfork_prepare (void)
1141 1156
1142 worker_clear (wrk); 1157 worker_clear (wrk);
1143 worker_free (wrk); 1158 worker_free (wrk);
1144 } 1159 }
1145 1160
1146 started = 0; 1161 started = 0;
1162 idle = 0;
1147 nreqs = 0; 1163 nreqs = 0;
1164 nready = 0;
1165 npending = 0;
1148 1166
1149 close (respipe [0]); 1167 close (respipe [0]);
1150 close (respipe [1]); 1168 close (respipe [1]);
1151 create_pipe (); 1169 create_pipe ();
1152 1170
1179PROTOTYPES: ENABLE 1197PROTOTYPES: ENABLE
1180 1198
1181BOOT: 1199BOOT:
1182{ 1200{
1183 HV *stash = gv_stashpv ("IO::AIO", 1); 1201 HV *stash = gv_stashpv ("IO::AIO", 1);
1202
1184 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); 1203 newCONSTSUB (stash, "EXDEV", newSViv (EXDEV));
1185 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); 1204 newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY));
1186 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); 1205 newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY));
1187 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); 1206 newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT));
1188 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); 1207 newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC));
1208 newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO));
1189 1209
1190 create_pipe (); 1210 create_pipe ();
1191 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1211 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1192
1193 start_thread ();
1194} 1212}
1195 1213
1196void 1214void
1197min_parallel (int nthreads) 1215min_parallel (int nthreads)
1198 PROTOTYPE: $ 1216 PROTOTYPE: $

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines