ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.71 by root, Tue Oct 24 16:35:04 2006 UTC vs.
Revision 1.74 by root, Wed Oct 25 17:57:30 2006 UTC

128 128
129static int next_pri = DEFAULT_PRI + PRI_BIAS; 129static int next_pri = DEFAULT_PRI + PRI_BIAS;
130 130
131static int started, wanted; 131static int started, wanted;
132static volatile int nreqs; 132static volatile int nreqs;
133static int max_outstanding = 1<<30;
134static int respipe [2]; 133static int respipe [2];
135 134
136#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 135#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
137# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 136# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
138#else 137#else
238 } 237 }
239 238
240 abort (); 239 abort ();
241} 240}
242 241
242static int poll_cb ();
243static void req_invoke (aio_req req); 243static void req_invoke (aio_req req);
244static void req_free (aio_req req); 244static void req_free (aio_req req);
245static void req_cancel (aio_req req);
245 246
246/* must be called at most once */ 247/* must be called at most once */
247static SV *req_sv (aio_req req, const char *klass) 248static SV *req_sv (aio_req req, const char *klass)
248{ 249{
249 if (!req->self) 250 if (!req->self)
468 free (req->data2ptr); 469 free (req->data2ptr);
469 470
470 Safefree (req); 471 Safefree (req);
471} 472}
472 473
474static void req_cancel_subs (aio_req grp)
475{
476 aio_req sub;
477
478 if (grp->type != REQ_GROUP)
479 return;
480
481 SvREFCNT_dec (grp->fh2);
482 grp->fh2 = 0;
483
484 for (sub = grp->grp_first; sub; sub = sub->grp_next)
485 req_cancel (sub);
486}
487
473static void req_cancel (aio_req req) 488static void req_cancel (aio_req req)
474{ 489{
475 req->flags |= FLAG_CANCELLED; 490 req->flags |= FLAG_CANCELLED;
476 491
477 if (req->type == REQ_GROUP) 492 req_cancel_subs (req);
478 {
479 aio_req sub;
480
481 for (sub = req->grp_first; sub; sub = sub->grp_next)
482 req_cancel (sub);
483 }
484} 493}
485 494
486static int poll_cb () 495static int poll_cb ()
487{ 496{
488 dSP; 497 dSP;
548 557
549static void *aio_proc(void *arg); 558static void *aio_proc(void *arg);
550 559
551static void start_thread (void) 560static void start_thread (void)
552{ 561{
562 sigset_t fullsigset, oldsigset;
563 pthread_attr_t attr;
564
553 worker *wrk = calloc (1, sizeof (worker)); 565 worker *wrk = calloc (1, sizeof (worker));
554 566
555 if (!wrk) 567 if (!wrk)
556 croak ("unable to allocate worker thread data"); 568 croak ("unable to allocate worker thread data");
557
558 sigset_t fullsigset, oldsigset;
559 pthread_attr_t attr;
560 569
561 pthread_attr_init (&attr); 570 pthread_attr_init (&attr);
562 pthread_attr_setstacksize (&attr, STACKSIZE); 571 pthread_attr_setstacksize (&attr, STACKSIZE);
563 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); 572 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
564 573
591 600
592 LOCK (reqlock); 601 LOCK (reqlock);
593 reqq_push (&req_queue, req); 602 reqq_push (&req_queue, req);
594 pthread_cond_signal (&reqwait); 603 pthread_cond_signal (&reqwait);
595 UNLOCK (reqlock); 604 UNLOCK (reqlock);
596
597 if (nreqs > max_outstanding)
598 for (;;)
599 {
600 poll_cb ();
601
602 if (nreqs <= max_outstanding)
603 break;
604
605 poll_wait ();
606 }
607} 605}
608 606
609static void end_thread (void) 607static void end_thread (void)
610{ 608{
611 aio_req req; 609 aio_req req;
1107void 1105void
1108max_parallel (nthreads) 1106max_parallel (nthreads)
1109 int nthreads 1107 int nthreads
1110 PROTOTYPE: $ 1108 PROTOTYPE: $
1111 1109
1112int
1113max_outstanding (nreqs)
1114 int nreqs
1115 PROTOTYPE: $
1116 CODE:
1117 RETVAL = max_outstanding;
1118 max_outstanding = nreqs;
1119
1120void 1110void
1121aio_open (pathname,flags,mode,callback=&PL_sv_undef) 1111aio_open (pathname,flags,mode,callback=&PL_sv_undef)
1122 SV * pathname 1112 SV * pathname
1123 int flags 1113 int flags
1124 int mode 1114 int mode
1495 } 1485 }
1496 } 1486 }
1497} 1487}
1498 1488
1499void 1489void
1490cancel_subs (aio_req_ornot req)
1491 CODE:
1492 req_cancel_subs (req);
1493
1494void
1500result (aio_req grp, ...) 1495result (aio_req grp, ...)
1501 CODE: 1496 CODE:
1502{ 1497{
1503 int i; 1498 int i;
1504 AV *av = newAV (); 1499 AV *av = newAV ();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines