… | |
… | |
43 | # else |
43 | # else |
44 | # error sendfile support requested but not available |
44 | # error sendfile support requested but not available |
45 | # endif |
45 | # endif |
46 | #endif |
46 | #endif |
47 | |
47 | |
|
|
48 | /* number of seconds after which idle threads exit */ |
|
|
49 | #define IDLE_TIMEOUT 10 |
|
|
50 | |
48 | /* used for struct dirent, AIX doesn't provide it */ |
51 | /* used for struct dirent, AIX doesn't provide it */ |
49 | #ifndef NAME_MAX |
52 | #ifndef NAME_MAX |
50 | # define NAME_MAX 4096 |
53 | # define NAME_MAX 4096 |
51 | #endif |
54 | #endif |
52 | |
55 | |
… | |
… | |
144 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
147 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
145 | }; |
148 | }; |
146 | |
149 | |
147 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
150 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
148 | |
151 | |
149 | static unsigned int started, wanted; |
152 | static unsigned int started, idle, wanted; |
150 | |
153 | |
151 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
154 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
152 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
155 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
153 | #else |
156 | #else |
154 | # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER |
157 | # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER |
… | |
… | |
196 | |
199 | |
197 | free (wrk); |
200 | free (wrk); |
198 | } |
201 | } |
199 | |
202 | |
200 | static volatile unsigned int nreqs, nready, npending; |
203 | static volatile unsigned int nreqs, nready, npending; |
|
|
204 | static volatile unsigned int max_idle = 4; |
201 | static volatile unsigned int max_outstanding = 0xffffffff; |
205 | static volatile unsigned int max_outstanding = 0xffffffff; |
202 | static int respipe [2]; |
206 | static int respipe [2]; |
203 | |
207 | |
204 | static pthread_mutex_t reslock = AIO_MUTEX_INIT; |
208 | static pthread_mutex_t reslock = AIO_MUTEX_INIT; |
205 | static pthread_mutex_t reqlock = AIO_MUTEX_INIT; |
209 | static pthread_mutex_t reqlock = AIO_MUTEX_INIT; |
… | |
… | |
556 | UNLOCK (wrklock); |
560 | UNLOCK (wrklock); |
557 | } |
561 | } |
558 | |
562 | |
559 | static void maybe_start_thread () |
563 | static void maybe_start_thread () |
560 | { |
564 | { |
561 | #if 0 |
|
|
562 | static struct timeval last; |
|
|
563 | struct timeval diff, now; |
|
|
564 | #endif |
|
|
565 | |
|
|
566 | if (started >= wanted) |
565 | if (started >= wanted) |
567 | return; |
566 | return; |
568 | |
567 | |
|
|
568 | /* todo: maybe use idle here, but might be less exact */ |
569 | if (nready <= nreqs - get_nready () - get_npending ()) |
569 | if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ())) |
570 | return; |
570 | return; |
571 | |
|
|
572 | #if 0 |
|
|
573 | gettimeofday (&now, 0); |
|
|
574 | |
|
|
575 | diff.tv_sec = now.tv_sec - last.tv_sec; |
|
|
576 | diff.tv_usec = now.tv_usec - last.tv_usec; |
|
|
577 | |
|
|
578 | if (diff.tv_usec < 0) |
|
|
579 | { |
|
|
580 | --diff.tv_sec; |
|
|
581 | diff.tv_usec += 1000000; |
|
|
582 | } |
|
|
583 | |
|
|
584 | if (!diff.tv_sec && diff.tv_usec < 10000) |
|
|
585 | return; |
|
|
586 | |
|
|
587 | last = now; |
|
|
588 | #endif |
|
|
589 | |
571 | |
590 | start_thread (); |
572 | start_thread (); |
591 | } |
573 | } |
592 | |
574 | |
593 | static void req_send (aio_req req) |
575 | static void req_send (aio_req req) |
… | |
… | |
610 | Newz (0, req, 1, aio_cb); |
592 | Newz (0, req, 1, aio_cb); |
611 | |
593 | |
612 | req->type = REQ_QUIT; |
594 | req->type = REQ_QUIT; |
613 | req->pri = PRI_MAX + PRI_BIAS; |
595 | req->pri = PRI_MAX + PRI_BIAS; |
614 | |
596 | |
615 | req_send (req); |
597 | LOCK (reqlock); |
|
|
598 | reqq_push (&req_queue, req); |
|
|
599 | pthread_cond_signal (&reqwait); |
|
|
600 | UNLOCK (reqlock); |
616 | |
601 | |
617 | LOCK (wrklock); |
602 | LOCK (wrklock); |
618 | --started; |
603 | --started; |
619 | UNLOCK (wrklock); |
604 | UNLOCK (wrklock); |
620 | } |
605 | } |
… | |
… | |
1001 | /*****************************************************************************/ |
986 | /*****************************************************************************/ |
1002 | |
987 | |
1003 | static void *aio_proc (void *thr_arg) |
988 | static void *aio_proc (void *thr_arg) |
1004 | { |
989 | { |
1005 | aio_req req; |
990 | aio_req req; |
|
|
991 | struct timespec ts; |
1006 | worker *self = (worker *)thr_arg; |
992 | worker *self = (worker *)thr_arg; |
1007 | |
993 | |
|
|
994 | /* try to distribute timeouts somewhat evenly */ |
|
|
995 | ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) |
|
|
996 | * (1000000000UL / 1024UL); |
|
|
997 | |
1008 | for (;;) |
998 | for (;;) |
1009 | { |
999 | { |
|
|
1000 | ts.tv_sec = time (0) + IDLE_TIMEOUT; |
|
|
1001 | |
1010 | LOCK (reqlock); |
1002 | LOCK (reqlock); |
1011 | |
1003 | |
1012 | for (;;) |
1004 | for (;;) |
1013 | { |
1005 | { |
1014 | self->req = req = reqq_shift (&req_queue); |
1006 | self->req = req = reqq_shift (&req_queue); |
1015 | |
1007 | |
1016 | if (req) |
1008 | if (req) |
1017 | break; |
1009 | break; |
1018 | |
1010 | |
|
|
1011 | ++idle; |
|
|
1012 | |
|
|
1013 | if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) |
|
|
1014 | == ETIMEDOUT) |
|
|
1015 | { |
|
|
1016 | if (idle > max_idle) |
|
|
1017 | { |
|
|
1018 | --idle; |
|
|
1019 | UNLOCK (reqlock); |
|
|
1020 | LOCK (wrklock); |
|
|
1021 | --started; |
|
|
1022 | UNLOCK (wrklock); |
|
|
1023 | goto quit; |
|
|
1024 | } |
|
|
1025 | |
|
|
1026 | /* we are allowed to idle, so do so without any timeout */ |
1019 | pthread_cond_wait (&reqwait, &reqlock); |
1027 | pthread_cond_wait (&reqwait, &reqlock); |
|
|
1028 | ts.tv_sec = time (0) + IDLE_TIMEOUT; |
|
|
1029 | } |
|
|
1030 | |
|
|
1031 | --idle; |
1020 | } |
1032 | } |
1021 | |
1033 | |
1022 | --nready; |
1034 | --nready; |
1023 | |
1035 | |
1024 | UNLOCK (reqlock); |
1036 | UNLOCK (reqlock); |
… | |
… | |
1064 | case REQ_GROUP: |
1076 | case REQ_GROUP: |
1065 | case REQ_NOP: |
1077 | case REQ_NOP: |
1066 | break; |
1078 | break; |
1067 | |
1079 | |
1068 | case REQ_QUIT: |
1080 | case REQ_QUIT: |
1069 | LOCK (wrklock); |
1081 | goto quit; |
1070 | worker_free (self); |
|
|
1071 | --started; |
|
|
1072 | UNLOCK (wrklock); |
|
|
1073 | return 0; |
|
|
1074 | |
1082 | |
1075 | default: |
1083 | default: |
1076 | req->result = ENOSYS; |
1084 | req->result = ENOSYS; |
1077 | break; |
1085 | break; |
1078 | } |
1086 | } |
… | |
… | |
1090 | self->req = 0; |
1098 | self->req = 0; |
1091 | worker_clear (self); |
1099 | worker_clear (self); |
1092 | |
1100 | |
1093 | UNLOCK (reslock); |
1101 | UNLOCK (reslock); |
1094 | } |
1102 | } |
|
|
1103 | |
|
|
1104 | quit: |
|
|
1105 | LOCK (wrklock); |
|
|
1106 | worker_free (self); |
|
|
1107 | UNLOCK (wrklock); |
|
|
1108 | |
|
|
1109 | return 0; |
1095 | } |
1110 | } |
1096 | |
1111 | |
1097 | /*****************************************************************************/ |
1112 | /*****************************************************************************/ |
1098 | |
1113 | |
1099 | static void atfork_prepare (void) |
1114 | static void atfork_prepare (void) |
… | |
… | |
1141 | |
1156 | |
1142 | worker_clear (wrk); |
1157 | worker_clear (wrk); |
1143 | worker_free (wrk); |
1158 | worker_free (wrk); |
1144 | } |
1159 | } |
1145 | |
1160 | |
1146 | started = 0; |
1161 | started = 0; |
|
|
1162 | idle = 0; |
1147 | nreqs = 0; |
1163 | nreqs = 0; |
|
|
1164 | nready = 0; |
|
|
1165 | npending = 0; |
1148 | |
1166 | |
1149 | close (respipe [0]); |
1167 | close (respipe [0]); |
1150 | close (respipe [1]); |
1168 | close (respipe [1]); |
1151 | create_pipe (); |
1169 | create_pipe (); |
1152 | |
1170 | |
… | |
… | |
1179 | PROTOTYPES: ENABLE |
1197 | PROTOTYPES: ENABLE |
1180 | |
1198 | |
1181 | BOOT: |
1199 | BOOT: |
1182 | { |
1200 | { |
1183 | HV *stash = gv_stashpv ("IO::AIO", 1); |
1201 | HV *stash = gv_stashpv ("IO::AIO", 1); |
|
|
1202 | |
1184 | newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); |
1203 | newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); |
1185 | newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); |
1204 | newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); |
1186 | newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); |
1205 | newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); |
1187 | newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); |
1206 | newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); |
1188 | newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); |
1207 | newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); |
|
|
1208 | newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); |
1189 | |
1209 | |
1190 | create_pipe (); |
1210 | create_pipe (); |
1191 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1211 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1192 | |
|
|
1193 | start_thread (); |
|
|
1194 | } |
1212 | } |
1195 | |
1213 | |
1196 | void |
1214 | void |
1197 | min_parallel (int nthreads) |
1215 | min_parallel (int nthreads) |
1198 | PROTOTYPE: $ |
1216 | PROTOTYPE: $ |