ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.77 by root, Thu Oct 26 13:25:40 2006 UTC vs.
Revision 1.78 by root, Thu Oct 26 14:35:34 2006 UTC

132 NUM_PRI = PRI_MAX + PRI_BIAS + 1, 132 NUM_PRI = PRI_MAX + PRI_BIAS + 1,
133}; 133};
134 134
135static int next_pri = DEFAULT_PRI + PRI_BIAS; 135static int next_pri = DEFAULT_PRI + PRI_BIAS;
136 136
137static int started, wanted; 137static unsigned int started, wanted;
138static volatile int nreqs; 138static volatile unsigned int nreqs;
139static volatile unsigned int max_outstanding = 0xffffffff;
139static int respipe [2]; 140static int respipe [2];
140 141
141#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 142#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
142# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 143# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
143#else 144#else
503 dSP; 504 dSP;
504 int count = 0; 505 int count = 0;
505 int do_croak = 0; 506 int do_croak = 0;
506 aio_req req; 507 aio_req req;
507 508
509 for (;;)
510 {
508 while (max <= 0 || count < max) 511 while (max <= 0 || count < max)
509 {
510 LOCK (reslock);
511 req = reqq_shift (&res_queue);
512
513 if (req)
514 { 512 {
513 LOCK (reslock);
514 req = reqq_shift (&res_queue);
515
515 if (!res_queue.size) 516 if (req)
516 { 517 {
518 if (!res_queue.size)
519 {
517 /* read any signals sent by the worker threads */ 520 /* read any signals sent by the worker threads */
518 char buf [32]; 521 char buf [32];
519 while (read (respipe [0], buf, 32) == 32) 522 while (read (respipe [0], buf, 32) == 32)
523 ;
520 ; 524 }
521 } 525 }
526
527 UNLOCK (reslock);
528
529 if (!req)
530 break;
531
532 --nreqs;
533
534 if (req->type == REQ_QUIT)
535 --started;
536 else if (req->type == REQ_GROUP && req->length)
537 {
538 req->fd = 1; /* mark request as delayed */
539 continue;
540 }
541 else
542 {
543 if (req->type == REQ_READ)
544 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
545
546 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
547 SvREADONLY_off (req->data);
548
549 if (req->statdata)
550 {
551 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
552 PL_laststatval = req->result;
553 PL_statcache = *(req->statdata);
554 }
555
556 req_invoke (req);
557
558 count++;
559 }
560
561 req_free (req);
522 } 562 }
523 563
524 UNLOCK (reslock); 564 if (nreqs <= max_outstanding)
525
526 if (!req)
527 break; 565 break;
528 566
529 --nreqs; 567 poll_wait ();
530 568
531 if (req->type == REQ_QUIT) 569 max = 0;
532 started--;
533 else if (req->type == REQ_GROUP && req->length)
534 {
535 req->fd = 1; /* mark request as delayed */
536 continue;
537 }
538 else
539 {
540 if (req->type == REQ_READ)
541 SvCUR_set (req->data, req->dataoffset + (req->result > 0 ? req->result : 0));
542
543 if (req->data2ptr && (req->type == REQ_READ || req->type == REQ_WRITE))
544 SvREADONLY_off (req->data);
545
546 if (req->statdata)
547 {
548 PL_laststype = req->type == REQ_LSTAT ? OP_LSTAT : OP_STAT;
549 PL_laststatval = req->result;
550 PL_statcache = *(req->statdata);
551 }
552
553 req_invoke (req);
554
555 count++;
556 }
557
558 req_free (req);
559 } 570 }
560 571
561 return count; 572 return count;
562} 573}
563 574
586 { 597 {
587 wrk->prev = &wrk_first; 598 wrk->prev = &wrk_first;
588 wrk->next = wrk_first.next; 599 wrk->next = wrk_first.next;
589 wrk_first.next->prev = wrk; 600 wrk_first.next->prev = wrk;
590 wrk_first.next = wrk; 601 wrk_first.next = wrk;
591 started++; 602 ++started;
592 } 603 }
593 else 604 else
594 free (wrk); 605 free (wrk);
595 606
596 sigprocmask (SIG_SETMASK, &oldsigset, 0); 607 sigprocmask (SIG_SETMASK, &oldsigset, 0);
1102 create_pipe (); 1113 create_pipe ();
1103 pthread_atfork (atfork_prepare, atfork_parent, atfork_child); 1114 pthread_atfork (atfork_prepare, atfork_parent, atfork_child);
1104} 1115}
1105 1116
1106void 1117void
1107min_parallel (nthreads) 1118min_parallel (int nthreads)
1108 int nthreads
1109 PROTOTYPE: $ 1119 PROTOTYPE: $
1110 1120
1111void 1121void
1112max_parallel (nthreads) 1122max_parallel (int nthreads)
1113 int nthreads
1114 PROTOTYPE: $ 1123 PROTOTYPE: $
1124
1125int
1126max_outstanding (int maxreqs)
1127 PROTOTYPE: $
1128 CODE:
1129 RETVAL = max_outstanding;
1130 max_outstanding = maxreqs;
1131 OUTPUT:
1132 RETVAL
1115 1133
1116void 1134void
1117aio_open (pathname,flags,mode,callback=&PL_sv_undef) 1135aio_open (pathname,flags,mode,callback=&PL_sv_undef)
1118 SV * pathname 1136 SV * pathname
1119 int flags 1137 int flags

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines