… | |
… | |
43 | # else |
43 | # else |
44 | # error sendfile support requested but not available |
44 | # error sendfile support requested but not available |
45 | # endif |
45 | # endif |
46 | #endif |
46 | #endif |
47 | |
47 | |
|
|
48 | /* number of seconds after which idle threads exit */ |
|
|
49 | #define IDLE_TIMEOUT 10 |
|
|
50 | |
48 | /* used for struct dirent, AIX doesn't provide it */ |
51 | /* used for struct dirent, AIX doesn't provide it */ |
49 | #ifndef NAME_MAX |
52 | #ifndef NAME_MAX |
50 | # define NAME_MAX 4096 |
53 | # define NAME_MAX 4096 |
51 | #endif |
54 | #endif |
52 | |
55 | |
… | |
… | |
92 | REQ_READ, REQ_WRITE, REQ_READAHEAD, |
95 | REQ_READ, REQ_WRITE, REQ_READAHEAD, |
93 | REQ_SENDFILE, |
96 | REQ_SENDFILE, |
94 | REQ_STAT, REQ_LSTAT, REQ_FSTAT, |
97 | REQ_STAT, REQ_LSTAT, REQ_FSTAT, |
95 | REQ_FSYNC, REQ_FDATASYNC, |
98 | REQ_FSYNC, REQ_FDATASYNC, |
96 | REQ_UNLINK, REQ_RMDIR, REQ_RENAME, |
99 | REQ_UNLINK, REQ_RMDIR, REQ_RENAME, |
97 | REQ_READDIR, |
100 | REQ_MKNOD, REQ_READDIR, |
98 | REQ_LINK, REQ_SYMLINK, |
101 | REQ_LINK, REQ_SYMLINK, |
99 | REQ_GROUP, REQ_NOP, |
102 | REQ_GROUP, REQ_NOP, |
100 | REQ_BUSY, |
103 | REQ_BUSY, |
101 | }; |
104 | }; |
102 | |
105 | |
… | |
… | |
144 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
147 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
145 | }; |
148 | }; |
146 | |
149 | |
147 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
150 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
148 | |
151 | |
149 | static unsigned int started, wanted; |
152 | static unsigned int started, idle, wanted; |
150 | |
153 | |
151 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
154 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
152 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
155 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
153 | #else |
156 | #else |
154 | # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER |
157 | # define AIO_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER |
… | |
… | |
196 | |
199 | |
197 | free (wrk); |
200 | free (wrk); |
198 | } |
201 | } |
199 | |
202 | |
200 | static volatile unsigned int nreqs, nready, npending; |
203 | static volatile unsigned int nreqs, nready, npending; |
|
|
204 | static volatile unsigned int max_idle = 4; |
201 | static volatile unsigned int max_outstanding = 0xffffffff; |
205 | static volatile unsigned int max_outstanding = 0xffffffff; |
202 | static int respipe [2]; |
206 | static int respipe [2]; |
203 | |
207 | |
204 | static pthread_mutex_t reslock = AIO_MUTEX_INIT; |
208 | static pthread_mutex_t reslock = AIO_MUTEX_INIT; |
205 | static pthread_mutex_t reqlock = AIO_MUTEX_INIT; |
209 | static pthread_mutex_t reqlock = AIO_MUTEX_INIT; |
… | |
… | |
556 | UNLOCK (wrklock); |
560 | UNLOCK (wrklock); |
557 | } |
561 | } |
558 | |
562 | |
559 | static void maybe_start_thread () |
563 | static void maybe_start_thread () |
560 | { |
564 | { |
561 | #if 0 |
|
|
562 | static struct timeval last; |
|
|
563 | struct timeval diff, now; |
|
|
564 | #endif |
|
|
565 | |
|
|
566 | if (started >= wanted) |
565 | if (started >= wanted) |
567 | return; |
566 | return; |
568 | |
567 | |
|
|
568 | /* todo: maybe use idle here, but might be less exact */ |
569 | if (nready <= nreqs - get_nready () - get_npending ()) |
569 | if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ())) |
570 | return; |
570 | return; |
571 | |
|
|
572 | #if 0 |
|
|
573 | gettimeofday (&now, 0); |
|
|
574 | |
|
|
575 | diff.tv_sec = now.tv_sec - last.tv_sec; |
|
|
576 | diff.tv_usec = now.tv_usec - last.tv_usec; |
|
|
577 | |
|
|
578 | if (diff.tv_usec < 0) |
|
|
579 | { |
|
|
580 | --diff.tv_sec; |
|
|
581 | diff.tv_usec += 1000000; |
|
|
582 | } |
|
|
583 | |
|
|
584 | if (!diff.tv_sec && diff.tv_usec < 10000) |
|
|
585 | return; |
|
|
586 | |
|
|
587 | last = now; |
|
|
588 | #endif |
|
|
589 | |
571 | |
590 | start_thread (); |
572 | start_thread (); |
591 | } |
573 | } |
592 | |
574 | |
593 | static void req_send (aio_req req) |
575 | static void req_send (aio_req req) |
… | |
… | |
610 | Newz (0, req, 1, aio_cb); |
592 | Newz (0, req, 1, aio_cb); |
611 | |
593 | |
612 | req->type = REQ_QUIT; |
594 | req->type = REQ_QUIT; |
613 | req->pri = PRI_MAX + PRI_BIAS; |
595 | req->pri = PRI_MAX + PRI_BIAS; |
614 | |
596 | |
615 | req_send (req); |
597 | LOCK (reqlock); |
|
|
598 | reqq_push (&req_queue, req); |
|
|
599 | pthread_cond_signal (&reqwait); |
|
|
600 | UNLOCK (reqlock); |
616 | |
601 | |
617 | LOCK (wrklock); |
602 | LOCK (wrklock); |
618 | --started; |
603 | --started; |
619 | UNLOCK (wrklock); |
604 | UNLOCK (wrklock); |
620 | } |
605 | } |
… | |
… | |
1001 | /*****************************************************************************/ |
986 | /*****************************************************************************/ |
1002 | |
987 | |
1003 | static void *aio_proc (void *thr_arg) |
988 | static void *aio_proc (void *thr_arg) |
1004 | { |
989 | { |
1005 | aio_req req; |
990 | aio_req req; |
|
|
991 | struct timespec ts; |
1006 | worker *self = (worker *)thr_arg; |
992 | worker *self = (worker *)thr_arg; |
1007 | |
993 | |
|
|
994 | /* try to distribute timeouts somewhat evenly */ |
|
|
995 | ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) |
|
|
996 | * (1000000000UL / 1024UL); |
|
|
997 | |
1008 | for (;;) |
998 | for (;;) |
1009 | { |
999 | { |
|
|
1000 | ts.tv_sec = time (0) + IDLE_TIMEOUT; |
|
|
1001 | |
1010 | LOCK (reqlock); |
1002 | LOCK (reqlock); |
1011 | |
1003 | |
1012 | for (;;) |
1004 | for (;;) |
1013 | { |
1005 | { |
1014 | self->req = req = reqq_shift (&req_queue); |
1006 | self->req = req = reqq_shift (&req_queue); |
1015 | |
1007 | |
1016 | if (req) |
1008 | if (req) |
1017 | break; |
1009 | break; |
1018 | |
1010 | |
|
|
1011 | ++idle; |
|
|
1012 | |
|
|
1013 | if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) |
|
|
1014 | == ETIMEDOUT) |
|
|
1015 | { |
|
|
1016 | if (idle > max_idle) |
|
|
1017 | { |
|
|
1018 | --idle; |
|
|
1019 | UNLOCK (reqlock); |
|
|
1020 | LOCK (wrklock); |
|
|
1021 | --started; |
|
|
1022 | UNLOCK (wrklock); |
|
|
1023 | goto quit; |
|
|
1024 | } |
|
|
1025 | |
|
|
1026 | /* we are allowed to idle, so do so without any timeout */ |
1019 | pthread_cond_wait (&reqwait, &reqlock); |
1027 | pthread_cond_wait (&reqwait, &reqlock); |
|
|
1028 | ts.tv_sec = time (0) + IDLE_TIMEOUT; |
|
|
1029 | } |
|
|
1030 | |
|
|
1031 | --idle; |
1020 | } |
1032 | } |
1021 | |
1033 | |
1022 | --nready; |
1034 | --nready; |
1023 | |
1035 | |
1024 | UNLOCK (reqlock); |
1036 | UNLOCK (reqlock); |
… | |
… | |
1043 | case REQ_UNLINK: req->result = unlink (req->dataptr); break; |
1055 | case REQ_UNLINK: req->result = unlink (req->dataptr); break; |
1044 | case REQ_RMDIR: req->result = rmdir (req->dataptr); break; |
1056 | case REQ_RMDIR: req->result = rmdir (req->dataptr); break; |
1045 | case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; |
1057 | case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; |
1046 | case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; |
1058 | case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; |
1047 | case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; |
1059 | case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; |
|
|
1060 | case REQ_MKNOD: req->result = mknod (req->data2ptr, req->mode, (dev_t)req->offset); break; |
1048 | |
1061 | |
1049 | case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; |
1062 | case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; |
1050 | case REQ_FSYNC: req->result = fsync (req->fd); break; |
1063 | case REQ_FSYNC: req->result = fsync (req->fd); break; |
1051 | case REQ_READDIR: scandir_ (req, self); break; |
1064 | case REQ_READDIR: scandir_ (req, self); break; |
1052 | |
1065 | |
… | |
… | |
1063 | case REQ_GROUP: |
1076 | case REQ_GROUP: |
1064 | case REQ_NOP: |
1077 | case REQ_NOP: |
1065 | break; |
1078 | break; |
1066 | |
1079 | |
1067 | case REQ_QUIT: |
1080 | case REQ_QUIT: |
1068 | LOCK (wrklock); |
1081 | goto quit; |
1069 | worker_free (self); |
|
|
1070 | --started; |
|
|
1071 | UNLOCK (wrklock); |
|
|
1072 | return 0; |
|
|
1073 | |
1082 | |
1074 | default: |
1083 | default: |
1075 | req->result = ENOSYS; |
1084 | req->result = ENOSYS; |
1076 | break; |
1085 | break; |
1077 | } |
1086 | } |
… | |
… | |
1089 | self->req = 0; |
1098 | self->req = 0; |
1090 | worker_clear (self); |
1099 | worker_clear (self); |
1091 | |
1100 | |
1092 | UNLOCK (reslock); |
1101 | UNLOCK (reslock); |
1093 | } |
1102 | } |
|
|
1103 | |
|
|
1104 | quit: |
|
|
1105 | LOCK (wrklock); |
|
|
1106 | worker_free (self); |
|
|
1107 | UNLOCK (wrklock); |
|
|
1108 | |
|
|
1109 | return 0; |
1094 | } |
1110 | } |
1095 | |
1111 | |
1096 | /*****************************************************************************/ |
1112 | /*****************************************************************************/ |
1097 | |
1113 | |
1098 | static void atfork_prepare (void) |
1114 | static void atfork_prepare (void) |
… | |
… | |
1140 | |
1156 | |
1141 | worker_clear (wrk); |
1157 | worker_clear (wrk); |
1142 | worker_free (wrk); |
1158 | worker_free (wrk); |
1143 | } |
1159 | } |
1144 | |
1160 | |
1145 | started = 0; |
1161 | started = 0; |
|
|
1162 | idle = 0; |
1146 | nreqs = 0; |
1163 | nreqs = 0; |
|
|
1164 | nready = 0; |
|
|
1165 | npending = 0; |
1147 | |
1166 | |
1148 | close (respipe [0]); |
1167 | close (respipe [0]); |
1149 | close (respipe [1]); |
1168 | close (respipe [1]); |
1150 | create_pipe (); |
1169 | create_pipe (); |
1151 | |
1170 | |
… | |
… | |
1178 | PROTOTYPES: ENABLE |
1197 | PROTOTYPES: ENABLE |
1179 | |
1198 | |
1180 | BOOT: |
1199 | BOOT: |
1181 | { |
1200 | { |
1182 | HV *stash = gv_stashpv ("IO::AIO", 1); |
1201 | HV *stash = gv_stashpv ("IO::AIO", 1); |
|
|
1202 | |
1183 | newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); |
1203 | newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); |
1184 | newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); |
1204 | newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); |
1185 | newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); |
1205 | newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); |
|
|
1206 | newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); |
|
|
1207 | newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); |
|
|
1208 | newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); |
1186 | |
1209 | |
1187 | create_pipe (); |
1210 | create_pipe (); |
1188 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1211 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1189 | |
|
|
1190 | start_thread (); |
|
|
1191 | } |
1212 | } |
1192 | |
1213 | |
1193 | void |
1214 | void |
1194 | min_parallel (int nthreads) |
1215 | min_parallel (int nthreads) |
1195 | PROTOTYPE: $ |
1216 | PROTOTYPE: $ |
… | |
… | |
1423 | req->type = ix; |
1444 | req->type = ix; |
1424 | req->fh = newSVsv (oldpath); |
1445 | req->fh = newSVsv (oldpath); |
1425 | req->data2ptr = SvPVbyte_nolen (req->fh); |
1446 | req->data2ptr = SvPVbyte_nolen (req->fh); |
1426 | req->data = newSVsv (newpath); |
1447 | req->data = newSVsv (newpath); |
1427 | req->dataptr = SvPVbyte_nolen (req->data); |
1448 | req->dataptr = SvPVbyte_nolen (req->data); |
|
|
1449 | |
|
|
1450 | REQ_SEND; |
|
|
1451 | } |
|
|
1452 | |
|
|
1453 | void |
|
|
1454 | aio_mknod (pathname,mode,dev,callback=&PL_sv_undef) |
|
|
1455 | SV * pathname |
|
|
1456 | SV * callback |
|
|
1457 | UV mode |
|
|
1458 | UV dev |
|
|
1459 | PPCODE: |
|
|
1460 | { |
|
|
1461 | dREQ; |
|
|
1462 | |
|
|
1463 | req->type = REQ_MKNOD; |
|
|
1464 | req->data = newSVsv (pathname); |
|
|
1465 | req->dataptr = SvPVbyte_nolen (req->data); |
|
|
1466 | req->mode = (mode_t)mode; |
|
|
1467 | req->offset = dev; |
1428 | |
1468 | |
1429 | REQ_SEND; |
1469 | REQ_SEND; |
1430 | } |
1470 | } |
1431 | |
1471 | |
1432 | void |
1472 | void |