… | |
… | |
132 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
132 | NUM_PRI = PRI_MAX + PRI_BIAS + 1, |
133 | }; |
133 | }; |
134 | |
134 | |
135 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
135 | static int next_pri = DEFAULT_PRI + PRI_BIAS; |
136 | |
136 | |
137 | static int started, wanted; |
137 | static unsigned int started, wanted; |
138 | static volatile int nreqs; |
138 | static volatile unsigned int nreqs; |
|
|
139 | static volatile unsigned int max_outstanding = 0xffffffff; |
139 | static int respipe [2]; |
140 | static int respipe [2]; |
140 | |
141 | |
141 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
142 | #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) |
142 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
143 | # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP |
143 | #else |
144 | #else |
… | |
… | |
503 | dSP; |
504 | dSP; |
504 | int count = 0; |
505 | int count = 0; |
505 | int do_croak = 0; |
506 | int do_croak = 0; |
506 | aio_req req; |
507 | aio_req req; |
507 | |
508 | |
|
|
509 | for (;;) |
|
|
510 | { |
508 | while (max <= 0 || count < max) |
511 | while (max <= 0 || count < max) |
509 | { |
|
|
510 | LOCK (reslock); |
|
|
511 | req = reqq_shift (&res_queue); |
|
|
512 | |
|
|
513 | if (req) |
|
|
514 | { |
512 | { |
|
|
513 | LOCK (reslock); |
|
|
514 | req = reqq_shift (&res_queue); |
|
|
515 | |
515 | if (!res_queue.size) |
516 | if (req) |
516 | { |
517 | { |
|
|
518 | if (!res_queue.size) |
|
|
519 | { |
517 | /* read any signals sent by the worker threads */ |
520 | /* read any signals sent by the worker threads */ |
518 | char buf [32]; |
521 | char buf [32]; |
519 | while (read (respipe [0], buf, 32) == 32) |
522 | while (read (respipe [0], buf, 32) == 32) |
|
|
523 | ; |
520 | ; |
524 | } |
521 | } |
525 | } |
|
|
526 | |
|
|
527 | UNLOCK (reslock); |
|
|
528 | |
|
|
529 | if (!req) |
|
|
530 | break; |
|
|
531 | |
|
|
532 | --nreqs; |
|
|
533 | |
|
|
534 | if (req->type == REQ_QUIT) |
|
|
535 | --started; |
|
|
536 | else if (req->type == REQ_GROUP && req->length) |
|
|
537 | { |
|
|
538 | req->fd = 1; /* mark request as delayed */ |
|
|
539 | continue; |
|
|
540 | } |
|
|
541 | else |
|
|
542 | { |
|
|
543 | if (req->type == REQ_READ) |
|
|
544 | SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0)); |
|
|
545 | |
|
|
546 | if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE)) |
|
|
547 | SvREADONLY_off (req->data); |
|
|
548 | |
|
|
549 | if (req->statdata) |
|
|
550 | { |
|
|
551 | PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; |
|
|
552 | PL_laststatval = req->result; |
|
|
553 | PL_statcache = *(req->statdata); |
|
|
554 | } |
|
|
555 | |
|
|
556 | req_invoke (req); |
|
|
557 | |
|
|
558 | count++; |
|
|
559 | } |
|
|
560 | |
|
|
561 | req_free (req); |
522 | } |
562 | } |
523 | |
563 | |
524 | UNLOCK (reslock); |
564 | if (nreqs <= max_outstanding) |
525 | |
|
|
526 | if (!req) |
|
|
527 | break; |
565 | break; |
528 | |
566 | |
529 | --nreqs; |
567 | poll_wait (); |
530 | |
568 | |
531 | if (req->type == REQ_QUIT) |
569 | max = 0; |
532 | started--; |
|
|
533 | else if (req->type == REQ_GROUP && req->length) |
|
|
534 | { |
|
|
535 | req->fd = 1; /* mark request as delayed */ |
|
|
536 | continue; |
|
|
537 | } |
|
|
538 | else |
|
|
539 | { |
|
|
540 | if (req->type == REQ_READ) |
|
|
541 | SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0)); |
|
|
542 | |
|
|
543 | if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE)) |
|
|
544 | SvREADONLY_off (req->data); |
|
|
545 | |
|
|
546 | if (req->statdata) |
|
|
547 | { |
|
|
548 | PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT; |
|
|
549 | PL_laststatval = req->result; |
|
|
550 | PL_statcache = *(req->statdata); |
|
|
551 | } |
|
|
552 | |
|
|
553 | req_invoke (req); |
|
|
554 | |
|
|
555 | count++; |
|
|
556 | } |
|
|
557 | |
|
|
558 | req_free (req); |
|
|
559 | } |
570 | } |
560 | |
571 | |
561 | return count; |
572 | return count; |
562 | } |
573 | } |
563 | |
574 | |
… | |
… | |
586 | { |
597 | { |
587 | wrk->prev = &wrk_first; |
598 | wrk->prev = &wrk_first; |
588 | wrk->next = wrk_first.next; |
599 | wrk->next = wrk_first.next; |
589 | wrk_first.next->prev = wrk; |
600 | wrk_first.next->prev = wrk; |
590 | wrk_first.next = wrk; |
601 | wrk_first.next = wrk; |
591 | started++; |
602 | ++started; |
592 | } |
603 | } |
593 | else |
604 | else |
594 | free (wrk); |
605 | free (wrk); |
595 | |
606 | |
596 | sigprocmask (SIG_SETMASK, &oldsigset, 0); |
607 | sigprocmask (SIG_SETMASK, &oldsigset, 0); |
… | |
… | |
1102 | create_pipe (); |
1113 | create_pipe (); |
1103 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1114 | pthread_atfork (atfork_prepare, atfork_parent, atfork_child); |
1104 | } |
1115 | } |
1105 | |
1116 | |
1106 | void |
1117 | void |
1107 | min_parallel (nthreads) |
1118 | min_parallel (int nthreads) |
1108 | int nthreads |
|
|
1109 | PROTOTYPE: $ |
1119 | PROTOTYPE: $ |
1110 | |
1120 | |
1111 | void |
1121 | void |
1112 | max_parallel (nthreads) |
1122 | max_parallel (int nthreads) |
1113 | int nthreads |
|
|
1114 | PROTOTYPE: $ |
1123 | PROTOTYPE: $ |
|
|
1124 | |
|
|
1125 | int |
|
|
1126 | max_outstanding (int maxreqs) |
|
|
1127 | PROTOTYPE: $ |
|
|
1128 | CODE: |
|
|
1129 | RETVAL = max_outstanding; |
|
|
1130 | max_outstanding = maxreqs; |
|
|
1131 | OUTPUT: |
|
|
1132 | RETVAL |
1115 | |
1133 | |
1116 | void |
1134 | void |
1117 | aio_open (pathname,flags,mode,callback=&PL_sv_undef) |
1135 | aio_open (pathname,flags,mode,callback=&PL_sv_undef) |
1118 | SV * pathname |
1136 | SV * pathname |
1119 | int flags |
1137 | int flags |