ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/IO-AIO/AIO.xs
(Generate patch)

Comparing IO-AIO/AIO.xs (file contents):
Revision 1.78 by root, Thu Oct 26 14:35:34 2006 UTC vs.
Revision 1.79 by root, Thu Oct 26 16:28:33 2006 UTC

61# define STACKSIZE PTHREAD_STACK_MIN 61# define STACKSIZE PTHREAD_STACK_MIN
62#else 62#else
63# define STACKSIZE 16384 63# define STACKSIZE 16384
64#endif 64#endif
65 65
66/* wether word reads are potentially non-atomic.
67 * this is conservatice, likely most arches this runs
68 * on have atomic word read/writes.
69 */
70#ifndef WORDREAD_UNSAFE
71# if __i386 || __x86_64
72# define WORDREAD_UNSAFE 0
73# else
74# define WORDREAD_UNSAFE 1
75# endif
76#endif
77
66/* buffer size for various temporary buffers */ 78/* buffer size for various temporary buffers */
67#define AIO_BUFSIZE 65536 79#define AIO_BUFSIZE 65536
68 80
69#define dBUF \ 81#define dBUF \
70 char *aio_buf; \ 82 char *aio_buf; \
133}; 145};
134 146
135static int next_pri = DEFAULT_PRI + PRI_BIAS; 147static int next_pri = DEFAULT_PRI + PRI_BIAS;
136 148
137static unsigned int started, wanted; 149static unsigned int started, wanted;
138static volatile unsigned int nreqs; 150static volatile unsigned int nreqs, nready, npending;
139static volatile unsigned int max_outstanding = 0xffffffff; 151static volatile unsigned int max_outstanding = 0xffffffff;
140static int respipe [2]; 152static int respipe [2];
141 153
142#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) 154#if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP)
143# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP 155# define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
326 fd_set rfd; 338 fd_set rfd;
327 339
328 while (nreqs) 340 while (nreqs)
329 { 341 {
330 int size; 342 int size;
331#if !(__i386 || __x86_64) /* safe without sempahore on these archs */ 343 if (WORDREAD_UNSAFE) LOCK (reslock);
332 LOCK (reslock);
333#endif
334 size = res_queue.size; 344 size = res_queue.size;
335#if !(__i386 || __x86_64) /* safe without sempahore on these archs */ 345 if (WORDREAD_UNSAFE) UNLOCK (reslock);
336 UNLOCK (reslock);
337#endif
338 346
339 if (size) 347 if (size)
340 return; 348 return;
341 349
342 FD_ZERO(&rfd); 350 FD_ZERO(&rfd);
350{ 358{
351 dSP; 359 dSP;
352 360
353 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback)) 361 if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback))
354 { 362 {
355 errno = req->errorno;
356
357 ENTER; 363 ENTER;
358 SAVETMPS; 364 SAVETMPS;
359 PUSHMARK (SP); 365 PUSHMARK (SP);
360 EXTEND (SP, 1); 366 EXTEND (SP, 1);
361 367
426 default: 432 default:
427 PUSHs (sv_2mortal (newSViv (req->result))); 433 PUSHs (sv_2mortal (newSViv (req->result)));
428 break; 434 break;
429 } 435 }
430 436
437 errno = req->errorno;
431 438
432 PUTBACK; 439 PUTBACK;
433 call_sv (req->callback, G_VOID | G_EVAL); 440 call_sv (req->callback, G_VOID | G_EVAL);
434 SPAGAIN; 441 SPAGAIN;
435 442
513 LOCK (reslock); 520 LOCK (reslock);
514 req = reqq_shift (&res_queue); 521 req = reqq_shift (&res_queue);
515 522
516 if (req) 523 if (req)
517 { 524 {
525 --npending;
526
518 if (!res_queue.size) 527 if (!res_queue.size)
519 { 528 {
520 /* read any signals sent by the worker threads */ 529 /* read any signals sent by the worker threads */
521 char buf [32]; 530 char buf [32];
522 while (read (respipe [0], buf, 32) == 32) 531 while (read (respipe [0], buf, 32) == 32)
614 start_thread (); 623 start_thread ();
615 624
616 ++nreqs; 625 ++nreqs;
617 626
618 LOCK (reqlock); 627 LOCK (reqlock);
628 ++nready;
619 reqq_push (&req_queue, req); 629 reqq_push (&req_queue, req);
620 pthread_cond_signal (&reqwait); 630 pthread_cond_signal (&reqwait);
621 UNLOCK (reqlock); 631 UNLOCK (reqlock);
622} 632}
623 633
945 break; 955 break;
946 956
947 pthread_cond_wait (&reqwait, &reqlock); 957 pthread_cond_wait (&reqwait, &reqlock);
948 } 958 }
949 959
960 --nready;
961
950 UNLOCK (reqlock); 962 UNLOCK (reqlock);
951 963
952 errno = 0; /* strictly unnecessary */ 964 errno = 0; /* strictly unnecessary */
953 type = req->type; /* remember type for QUIT check */ 965 type = req->type; /* remember type for QUIT check */
954 966
998 } 1010 }
999 1011
1000 req->errorno = errno; 1012 req->errorno = errno;
1001 1013
1002 LOCK (reslock); 1014 LOCK (reslock);
1015
1016 ++npending;
1003 1017
1004 if (!reqq_push (&res_queue, req)) 1018 if (!reqq_push (&res_queue, req))
1005 /* write a dummy byte to the pipe so fh becomes ready */ 1019 /* write a dummy byte to the pipe so fh becomes ready */
1006 write (respipe [1], &respipe, 1); 1020 write (respipe [1], &respipe, 1);
1007 1021
1392 req->type = REQ_NOP; 1406 req->type = REQ_NOP;
1393 1407
1394 REQ_SEND; 1408 REQ_SEND;
1395} 1409}
1396 1410
1397void 1411int
1398aioreq_pri (int pri = DEFAULT_PRI) 1412aioreq_pri (int pri = 0)
1399 CODE: 1413 PROTOTYPE: ;$
1414 CODE:
1415 RETVAL = next_pri - PRI_BIAS;
1416 if (items > 0)
1417 {
1400 if (pri < PRI_MIN) pri = PRI_MIN; 1418 if (pri < PRI_MIN) pri = PRI_MIN;
1401 if (pri > PRI_MAX) pri = PRI_MAX; 1419 if (pri > PRI_MAX) pri = PRI_MAX;
1402 next_pri = pri + PRI_BIAS; 1420 next_pri = pri + PRI_BIAS;
1421 }
1422 OUTPUT:
1423 RETVAL
1403 1424
1404void 1425void
1405aioreq_nice (int nice = 0) 1426aioreq_nice (int nice = 0)
1406 CODE: 1427 CODE:
1407 nice = next_pri - nice; 1428 nice = next_pri - nice;
1408 if (nice < PRI_MIN) nice = PRI_MIN; 1429 if (nice < PRI_MIN) nice = PRI_MIN;
1409 if (nice > PRI_MAX) nice = PRI_MAX; 1430 if (nice > PRI_MAX) nice = PRI_MAX;
1410 next_pri = nice + PRI_BIAS; 1431 next_pri = nice + PRI_BIAS;
1411 1432
1412void 1433void
1413flush () 1434flush ()
1414 PROTOTYPE: 1435 PROTOTYPE:
1415 CODE: 1436 CODE:
1466 CODE: 1487 CODE:
1467 RETVAL = nreqs; 1488 RETVAL = nreqs;
1468 OUTPUT: 1489 OUTPUT:
1469 RETVAL 1490 RETVAL
1470 1491
1492int
1493nready()
1494 PROTOTYPE:
1495 CODE:
1496 if (WORDREAD_UNSAFE) LOCK (reqlock);
1497 RETVAL = nready;
1498 if (WORDREAD_UNSAFE) UNLOCK (reqlock);
1499 OUTPUT:
1500 RETVAL
1501
1502int
1503npending()
1504 PROTOTYPE:
1505 CODE:
1506 if (WORDREAD_UNSAFE) LOCK (reslock);
1507 RETVAL = npending;
1508 if (WORDREAD_UNSAFE) UNLOCK (reslock);
1509 OUTPUT:
1510 RETVAL
1511
1471PROTOTYPES: DISABLE 1512PROTOTYPES: DISABLE
1472 1513
1473MODULE = IO::AIO PACKAGE = IO::AIO::REQ 1514MODULE = IO::AIO PACKAGE = IO::AIO::REQ
1474 1515
1475void 1516void
1526void 1567void
1527result (aio_req grp, ...) 1568result (aio_req grp, ...)
1528 CODE: 1569 CODE:
1529{ 1570{
1530 int i; 1571 int i;
1572 AV *av;
1573
1574 grp->errorno = errno;
1575
1531 AV *av = newAV (); 1576 av = newAV ();
1532 1577
1533 for (i = 1; i < items; ++i ) 1578 for (i = 1; i < items; ++i )
1534 av_push (av, newSVsv (ST (i))); 1579 av_push (av, newSVsv (ST (i)));
1535 1580
1536 SvREFCNT_dec (grp->data); 1581 SvREFCNT_dec (grp->data);
1537 grp->data = (SV *)av; 1582 grp->data = (SV *)av;
1538} 1583}
1584
1585void
1586errno (aio_req grp, int errorno = errno)
1587 CODE:
1588 grp->errorno = errorno;
1539 1589
1540void 1590void
1541limit (aio_req grp, int limit) 1591limit (aio_req grp, int limit)
1542 CODE: 1592 CODE:
1543 grp->fd2 = limit; 1593 grp->fd2 = limit;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines