… | |
… | |
43 | # else |
43 | # else |
44 | # error sendfile support requested but not available |
44 | # error sendfile support requested but not available |
45 | # endif |
45 | # endif |
46 | #endif |
46 | #endif |
47 | |
47 | |
|
|
48 | /* number of seconds after which idle threads exit */ |
|
|
49 | #define IDLE_TIMEOUT 10 |
|
|
50 | |
48 | /* used for struct dirent, AIX doesn't provide it */ |
51 | /* used for struct dirent, AIX doesn't provide it */ |
49 | #ifndef NAME_MAX |
52 | #ifndef NAME_MAX |
50 | # define NAME_MAX 4096 |
53 | # define NAME_MAX 4096 |
51 | #endif |
54 | #endif |
52 | |
55 | |
… | |
… | |
144 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
147 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
145 | }; |
148 | }; |
146 | |
149 | |
147 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
150 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
148 | |
151 | |
149 | static unsigned int started, wanted; |
152 | static unsigned int started, idle, wanted; |
150 | |
153 | |
151 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
154 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
152 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
155 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
153 | #else |
156 | #else |
154 | # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER |
157 | # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER |
… | |
… | |
196 | |
199 | |
197 | free (wrk); |
200 | free (wrk); |
198 | } |
201 | } |
199 | |
202 | |
200 | static volatile unsigned int nreqs, nready, npending; |
203 | static volatile unsigned int nreqs, nready, npending; |
|
|
204 | static volatile unsigned int max_idle = 4; |
201 | static volatile unsigned int max_outstanding = 0xffffffff; |
205 | static volatile unsigned int max_outstanding = 0xffffffff; |
202 | static int respipe [2]; |
206 | static int respipe [2]; |
203 | |
207 | |
204 | static pthread_mutex_t reslock = AIO_MUTEX_INIT; |
208 | static pthread_mutex_t reslock = AIO_MUTEX_INIT; |
205 | static pthread_mutex_t reqlock = AIO_MUTEX_INIT; |
209 | static pthread_mutex_t reqlock = AIO_MUTEX_INIT; |
… | |
… | |
556 | UNLOCK (wrklock); |
560 | UNLOCK (wrklock); |
557 | } |
561 | } |
558 | |
562 | |
559 | static void maybe_start_thread () |
563 | static void maybe_start_thread () |
560 | { |
564 | { |
561 | #if 0 |
|
|
562 | static struct timeval last; |
|
|
563 | struct timeval diff, now; |
|
|
564 | #endif |
|
|
565 | |
|
|
566 | if (started >= wanted) |
565 | if (started >= wanted) |
567 | return; |
566 | return; |
568 | |
567 | |
|
|
568 | /* todo: maybe use idle here, but might be less exact */ |
569 | if (nready <= nreqs - get_nready () - get_npending ()) |
569 | if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ())) |
570 | return; |
570 | return; |
571 | |
|
|
572 | #if 0 |
|
|
573 | gettimeofday (&now, 0); |
|
|
574 | |
|
|
575 | diff.tv_sec = now.tv_sec - last.tv_sec; |
|
|
576 | diff.tv_usec = now.tv_usec - last.tv_usec; |
|
|
577 | |
|
|
578 | if (diff.tv_usec < 0) |
|
|
579 | { |
|
|
580 | --diff.tv_sec; |
|
|
581 | diff.tv_usec += 1000000; |
|
|
582 | } |
|
|
583 | |
|
|
584 | if (!diff.tv_sec && diff.tv_usec < 10000) |
|
|
585 | return; |
|
|
586 | |
|
|
587 | last = now; |
|
|
588 | #endif |
|
|
589 | |
571 | |
590 | start_thread (); |
572 | start_thread (); |
591 | } |
573 | } |
592 | |
574 | |
593 | static void req_send (aio_req req) |
575 | static void req_send (aio_req req) |
… | |
… | |
1004 | /*****************************************************************************/ |
986 | /*****************************************************************************/ |
1005 | |
987 | |
1006 | static void *aio_proc (void *thr_arg) |
988 | static void *aio_proc (void *thr_arg) |
1007 | { |
989 | { |
1008 | aio_req req; |
990 | aio_req req; |
|
|
991 | struct timespec ts; |
1009 | worker *self = (worker *)thr_arg; |
992 | worker *self = (worker *)thr_arg; |
1010 | |
993 | |
|
|
994 | /* try to distribute timeouts somewhat evenly */ |
|
|
995 | ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) |
|
|
996 | * (1000000000UL / 1024UL); |
|
|
997 | |
1011 | for (;;) |
998 | for (;;) |
1012 | { |
999 | { |
|
|
1000 | ts.tv_sec = time (0) + IDLE_TIMEOUT; |
|
|
1001 | |
1013 | LOCK (reqlock); |
1002 | LOCK (reqlock); |
1014 | |
1003 | |
1015 | for (;;) |
1004 | for (;;) |
1016 | { |
1005 | { |
1017 | self->req = req = reqq_shift (&req_queue); |
1006 | self->req = req = reqq_shift (&req_queue); |
1018 | |
1007 | |
1019 | if (req) |
1008 | if (req) |
1020 | break; |
1009 | break; |
1021 | |
1010 | |
|
|
1011 | ++idle; |
|
|
1012 | |
|
|
1013 | if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) |
|
|
1014 | == ETIMEDOUT) |
|
|
1015 | { |
|
|
1016 | if (idle > max_idle) |
|
|
1017 | { |
|
|
1018 | --idle; |
|
|
1019 | UNLOCK (reqlock); |
|
|
1020 | LOCK (wrklock); |
|
|
1021 | --started; |
|
|
1022 | UNLOCK (wrklock); |
|
|
1023 | goto quit; |
|
|
1024 | } |
|
|
1025 | |
|
|
1026 | /* we are allowed to idle, so do so without any timeout */ |
1022 | pthread_cond_wait (&reqwait, &reqlock); |
1027 | pthread_cond_wait (&reqwait, &reqlock); |
|
|
1028 | ts.tv_sec = time (0) + IDLE_TIMEOUT; |
|
|
1029 | } |
|
|
1030 | |
|
|
1031 | --idle; |
1023 | } |
1032 | } |
1024 | |
1033 | |
1025 | --nready; |
1034 | --nready; |
1026 | |
1035 | |
1027 | UNLOCK (reqlock); |
1036 | UNLOCK (reqlock); |
… | |
… | |
1067 | case REQ_GROUP: |
1076 | case REQ_GROUP: |
1068 | case REQ_NOP: |
1077 | case REQ_NOP: |
1069 | break; |
1078 | break; |
1070 | |
1079 | |
1071 | case REQ_QUIT: |
1080 | case REQ_QUIT: |
1072 | LOCK (wrklock); |
1081 | goto quit; |
1073 | worker_free (self); |
|
|
1074 | --started; |
|
|
1075 | UNLOCK (wrklock); |
|
|
1076 | return 0; |
|
|
1077 | |
1082 | |
1078 | default: |
1083 | default: |
1079 | req->result = ENOSYS; |
1084 | req->result = ENOSYS; |
1080 | break; |
1085 | break; |
1081 | } |
1086 | } |
… | |
… | |
1093 | self->req = 0; |
1098 | self->req = 0; |
1094 | worker_clear (self); |
1099 | worker_clear (self); |
1095 | |
1100 | |
1096 | UNLOCK (reslock); |
1101 | UNLOCK (reslock); |
1097 | } |
1102 | } |
|
|
1103 | |
|
|
1104 | quit: |
|
|
1105 | LOCK (wrklock); |
|
|
1106 | worker_free (self); |
|
|
1107 | UNLOCK (wrklock); |
|
|
1108 | |
|
|
1109 | return 0; |
1098 | } |
1110 | } |
1099 | |
1111 | |
1100 | /*****************************************************************************/ |
1112 | /*****************************************************************************/ |
1101 | |
1113 | |
1102 | static void atfork_prepare (void) |
1114 | static void atfork_prepare (void) |
… | |
… | |
1144 | |
1156 | |
1145 | worker_clear (wrk); |
1157 | worker_clear (wrk); |
1146 | worker_free (wrk); |
1158 | worker_free (wrk); |
1147 | } |
1159 | } |
1148 | |
1160 | |
1149 | started = 0; |
1161 | started = 0; |
|
|
1162 | idle = 0; |
1150 | nreqs = 0; |
1163 | nreqs = 0; |
|
|
1164 | nready = 0; |
|
|
1165 | npending = 0; |
1151 | |
1166 | |
1152 | close (respipe [0]); |
1167 | close (respipe [0]); |
1153 | close (respipe [1]); |
1168 | close (respipe [1]); |
1154 | create_pipe (); |
1169 | create_pipe (); |
1155 | |
1170 | |
… | |
… | |
1192 | newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); |
1207 | newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); |
1193 | newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); |
1208 | newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); |
1194 | |
1209 | |
1195 | create_pipe (); |
1210 | create_pipe (); |
1196 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1211 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1197 | |
|
|
1198 | start_thread (); |
|
|
1199 | } |
1212 | } |
1200 | |
1213 | |
1201 | void |
1214 | void |
1202 | min_parallel (int nthreads) |
1215 | min_parallel (int nthreads) |
1203 | PROTOTYPE: $ |
1216 | PROTOTYPE: $ |