ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
(Generate patch)

Comparing libeio/eio.c (file contents):
Revision 1.126 by root, Fri Dec 28 07:33:41 2012 UTC vs.
Revision 1.129 by root, Sun Apr 14 09:43:19 2013 UTC

1/* 1/*
2 * libeio implementation 2 * libeio implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012 Marc Alexander Lehmann <libeio@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libeio@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
213 #endif 213 #endif
214 214
215 return EIO_ERRNO (ENOENT, -1); 215 return EIO_ERRNO (ENOENT, -1);
216 } 216 }
217 217
218 /* POSIX API only */ 218 /* POSIX API only, causing trouble for win32 apps */
219 #define CreateHardLink(neu,old,flags) 0 219 #define CreateHardLink(neu,old,flags) 0 /* not really creating hardlink, still using relative paths? */
220 #define CreateSymbolicLink(neu,old,flags) 0 220 #define CreateSymbolicLink(neu,old,flags) 0 /* vista+ only */
221 221
222 struct statvfs 222 struct statvfs
223 { 223 {
224 int dummy; 224 int dummy;
225 }; 225 };
231 231
232#else 232#else
233 233
234 #include <sys/time.h> 234 #include <sys/time.h>
235 #include <sys/select.h> 235 #include <sys/select.h>
236 #include <sys/statvfs.h>
237 #include <unistd.h> 236 #include <unistd.h>
238 #include <signal.h> 237 #include <signal.h>
239 #include <dirent.h> 238 #include <dirent.h>
239
240 #ifdef ANDROID
241 #include <sys/vfs.h>
242 #define statvfs statfs
243 #define fstatvfs fstatfs
244 #include <asm/page.h> /* supposedly limits.h does #define PAGESIZE PAGESIZE */
245 #else
246 #include <sys/statvfs.h>
247 #endif
240 248
241 #if _POSIX_MEMLOCK || _POSIX_MEMLOCK_RANGE || _POSIX_MAPPED_FILES 249 #if _POSIX_MEMLOCK || _POSIX_MEMLOCK_RANGE || _POSIX_MAPPED_FILES
242 #include <sys/mman.h> 250 #include <sys/mman.h>
243 #endif 251 #endif
244 252
325 return -1 333 return -1
326 334
327#define FUBd \ 335#define FUBd \
328 free (eio_buf) 336 free (eio_buf)
329 337
330#define EIO_TICKS ((1000000 + 1023) >> 10)
331
332/*****************************************************************************/ 338/*****************************************************************************/
333 339
334struct tmpbuf 340struct tmpbuf
335{ 341{
336 void *ptr; 342 void *ptr;
374/*****************************************************************************/ 380/*****************************************************************************/
375 381
376#define ETP_PRI_MIN EIO_PRI_MIN 382#define ETP_PRI_MIN EIO_PRI_MIN
377#define ETP_PRI_MAX EIO_PRI_MAX 383#define ETP_PRI_MAX EIO_PRI_MAX
378 384
385#define ETP_TYPE_QUIT -1
386#define ETP_TYPE_GROUP EIO_GROUP
387
379struct etp_worker; 388struct etp_worker;
380 389
381#define ETP_REQ eio_req 390#define ETP_REQ eio_req
382#define ETP_DESTROY(req) eio_destroy (req) 391#define ETP_DESTROY(req) eio_destroy (req)
383static int eio_finish (eio_req *req); 392static int eio_finish (eio_req *req);
384#define ETP_FINISH(req) eio_finish (req) 393#define ETP_FINISH(req) eio_finish (req)
385static void eio_execute (struct etp_worker *self, eio_req *req); 394static void eio_execute (struct etp_worker *self, eio_req *req);
386#define ETP_EXECUTE(wrk,req) eio_execute (wrk,req) 395#define ETP_EXECUTE(wrk,req) eio_execute (wrk, req)
387 396
388/*****************************************************************************/ 397#include "etp.c"
389
390#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
391
392/* calculate time difference in ~1/EIO_TICKS of a second */
393ecb_inline int
394tvdiff (struct timeval *tv1, struct timeval *tv2)
395{
396 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
397 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
398}
399
400static unsigned int started, idle, wanted = 4;
401
402static void (*want_poll_cb) (void);
403static void (*done_poll_cb) (void);
404
405static unsigned int max_poll_time; /* reslock */
406static unsigned int max_poll_reqs; /* reslock */
407
408static unsigned int nreqs; /* reqlock */
409static unsigned int nready; /* reqlock */
410static unsigned int npending; /* reqlock */
411static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
412static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
413
414static xmutex_t wrklock;
415static xmutex_t reslock;
416static xmutex_t reqlock;
417static xcond_t reqwait;
418
419typedef struct etp_worker
420{
421 struct tmpbuf tmpbuf;
422
423 /* locked by wrklock */
424 struct etp_worker *prev, *next;
425
426 xthread_t tid;
427
428#ifdef ETP_WORKER_COMMON
429 ETP_WORKER_COMMON
430#endif
431} etp_worker;
432
433static etp_worker wrk_first; /* NOT etp */
434
435#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
436#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
437
438/* worker threads management */
439
440static void
441etp_worker_clear (etp_worker *wrk)
442{
443}
444
445static void ecb_cold
446etp_worker_free (etp_worker *wrk)
447{
448 free (wrk->tmpbuf.ptr);
449
450 wrk->next->prev = wrk->prev;
451 wrk->prev->next = wrk->next;
452
453 free (wrk);
454}
455
456static unsigned int
457etp_nreqs (void)
458{
459 int retval;
460 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
461 retval = nreqs;
462 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
463 return retval;
464}
465
466static unsigned int
467etp_nready (void)
468{
469 unsigned int retval;
470
471 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
472 retval = nready;
473 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
474
475 return retval;
476}
477
478static unsigned int
479etp_npending (void)
480{
481 unsigned int retval;
482
483 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
484 retval = npending;
485 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
486
487 return retval;
488}
489
490static unsigned int
491etp_nthreads (void)
492{
493 unsigned int retval;
494
495 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
496 retval = started;
497 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
498
499 return retval;
500}
501
502/*
503 * a somewhat faster data structure might be nice, but
504 * with 8 priorities this actually needs <20 insns
505 * per shift, the most expensive operation.
506 */
507typedef struct {
508 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
509 int size;
510} etp_reqq;
511
512static etp_reqq req_queue;
513static etp_reqq res_queue;
514
515static void ecb_noinline ecb_cold
516reqq_init (etp_reqq *q)
517{
518 int pri;
519
520 for (pri = 0; pri < ETP_NUM_PRI; ++pri)
521 q->qs[pri] = q->qe[pri] = 0;
522
523 q->size = 0;
524}
525
526static int ecb_noinline
527reqq_push (etp_reqq *q, ETP_REQ *req)
528{
529 int pri = req->pri;
530 req->next = 0;
531
532 if (q->qe[pri])
533 {
534 q->qe[pri]->next = req;
535 q->qe[pri] = req;
536 }
537 else
538 q->qe[pri] = q->qs[pri] = req;
539
540 return q->size++;
541}
542
543static ETP_REQ * ecb_noinline
544reqq_shift (etp_reqq *q)
545{
546 int pri;
547
548 if (!q->size)
549 return 0;
550
551 --q->size;
552
553 for (pri = ETP_NUM_PRI; pri--; )
554 {
555 eio_req *req = q->qs[pri];
556
557 if (req)
558 {
559 if (!(q->qs[pri] = (eio_req *)req->next))
560 q->qe[pri] = 0;
561
562 return req;
563 }
564 }
565
566 abort ();
567}
568
569static int ecb_cold
570etp_init (void (*want_poll)(void), void (*done_poll)(void))
571{
572 X_MUTEX_CREATE (wrklock);
573 X_MUTEX_CREATE (reslock);
574 X_MUTEX_CREATE (reqlock);
575 X_COND_CREATE (reqwait);
576
577 reqq_init (&req_queue);
578 reqq_init (&res_queue);
579
580 wrk_first.next =
581 wrk_first.prev = &wrk_first;
582
583 started = 0;
584 idle = 0;
585 nreqs = 0;
586 nready = 0;
587 npending = 0;
588
589 want_poll_cb = want_poll;
590 done_poll_cb = done_poll;
591
592 return 0;
593}
594
595X_THREAD_PROC (etp_proc);
596
597static void ecb_cold
598etp_start_thread (void)
599{
600 etp_worker *wrk = calloc (1, sizeof (etp_worker));
601
602 /*TODO*/
603 assert (("unable to allocate worker thread data", wrk));
604
605 X_LOCK (wrklock);
606
607 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
608 {
609 wrk->prev = &wrk_first;
610 wrk->next = wrk_first.next;
611 wrk_first.next->prev = wrk;
612 wrk_first.next = wrk;
613 ++started;
614 }
615 else
616 free (wrk);
617
618 X_UNLOCK (wrklock);
619}
620
621static void
622etp_maybe_start_thread (void)
623{
624 if (ecb_expect_true (etp_nthreads () >= wanted))
625 return;
626
627 /* todo: maybe use idle here, but might be less exact */
628 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
629 return;
630
631 etp_start_thread ();
632}
633
634static void ecb_cold
635etp_end_thread (void)
636{
637 eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */
638
639 req->type = -1;
640 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
641
642 X_LOCK (reqlock);
643 reqq_push (&req_queue, req);
644 X_COND_SIGNAL (reqwait);
645 X_UNLOCK (reqlock);
646
647 X_LOCK (wrklock);
648 --started;
649 X_UNLOCK (wrklock);
650}
651
652static int
653etp_poll (void)
654{
655 unsigned int maxreqs;
656 unsigned int maxtime;
657 struct timeval tv_start, tv_now;
658
659 X_LOCK (reslock);
660 maxreqs = max_poll_reqs;
661 maxtime = max_poll_time;
662 X_UNLOCK (reslock);
663
664 if (maxtime)
665 gettimeofday (&tv_start, 0);
666
667 for (;;)
668 {
669 ETP_REQ *req;
670
671 etp_maybe_start_thread ();
672
673 X_LOCK (reslock);
674 req = reqq_shift (&res_queue);
675
676 if (req)
677 {
678 --npending;
679
680 if (!res_queue.size && done_poll_cb)
681 done_poll_cb ();
682 }
683
684 X_UNLOCK (reslock);
685
686 if (!req)
687 return 0;
688
689 X_LOCK (reqlock);
690 --nreqs;
691 X_UNLOCK (reqlock);
692
693 if (ecb_expect_false (req->type == EIO_GROUP && req->size))
694 {
695 req->int1 = 1; /* mark request as delayed */
696 continue;
697 }
698 else
699 {
700 int res = ETP_FINISH (req);
701 if (ecb_expect_false (res))
702 return res;
703 }
704
705 if (ecb_expect_false (maxreqs && !--maxreqs))
706 break;
707
708 if (maxtime)
709 {
710 gettimeofday (&tv_now, 0);
711
712 if (tvdiff (&tv_start, &tv_now) >= maxtime)
713 break;
714 }
715 }
716
717 errno = EAGAIN;
718 return -1;
719}
720
721static void
722etp_cancel (ETP_REQ *req)
723{
724 req->cancelled = 1;
725
726 eio_grp_cancel (req);
727}
728
729static void
730etp_submit (ETP_REQ *req)
731{
732 req->pri -= ETP_PRI_MIN;
733
734 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
735 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
736
737 if (ecb_expect_false (req->type == EIO_GROUP))
738 {
739 /* I hope this is worth it :/ */
740 X_LOCK (reqlock);
741 ++nreqs;
742 X_UNLOCK (reqlock);
743
744 X_LOCK (reslock);
745
746 ++npending;
747
748 if (!reqq_push (&res_queue, req) && want_poll_cb)
749 want_poll_cb ();
750
751 X_UNLOCK (reslock);
752 }
753 else
754 {
755 X_LOCK (reqlock);
756 ++nreqs;
757 ++nready;
758 reqq_push (&req_queue, req);
759 X_COND_SIGNAL (reqwait);
760 X_UNLOCK (reqlock);
761
762 etp_maybe_start_thread ();
763 }
764}
765
766static void ecb_cold
767etp_set_max_poll_time (double nseconds)
768{
769 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
770 max_poll_time = nseconds * EIO_TICKS;
771 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
772}
773
774static void ecb_cold
775etp_set_max_poll_reqs (unsigned int maxreqs)
776{
777 if (WORDACCESS_UNSAFE) X_LOCK (reslock);
778 max_poll_reqs = maxreqs;
779 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
780}
781
782static void ecb_cold
783etp_set_max_idle (unsigned int nthreads)
784{
785 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
786 max_idle = nthreads;
787 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
788}
789
790static void ecb_cold
791etp_set_idle_timeout (unsigned int seconds)
792{
793 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
794 idle_timeout = seconds;
795 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
796}
797
798static void ecb_cold
799etp_set_min_parallel (unsigned int nthreads)
800{
801 if (wanted < nthreads)
802 wanted = nthreads;
803}
804
805static void ecb_cold
806etp_set_max_parallel (unsigned int nthreads)
807{
808 if (wanted > nthreads)
809 wanted = nthreads;
810
811 while (started > wanted)
812 etp_end_thread ();
813}
814 398
815/*****************************************************************************/ 399/*****************************************************************************/
816 400
817static void 401static void
818grp_try_feed (eio_req *grp) 402grp_try_feed (eio_req *grp)
885} 469}
886 470
887void 471void
888eio_grp_cancel (eio_req *grp) 472eio_grp_cancel (eio_req *grp)
889{ 473{
890 for (grp = grp->grp_first; grp; grp = grp->grp_next)
891 eio_cancel (grp); 474 etp_grp_cancel (grp);
892} 475}
893 476
894void 477void
895eio_cancel (eio_req *req) 478eio_cancel (eio_req *req)
896{ 479{
2148 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio"); 1731 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
2149 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0); 1732 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
2150#endif 1733#endif
2151} 1734}
2152 1735
1736/* TODO: move somehow to etp.c */
2153X_THREAD_PROC (etp_proc) 1737X_THREAD_PROC (etp_proc)
2154{ 1738{
2155 ETP_REQ *req; 1739 ETP_REQ *req;
2156 struct timespec ts; 1740 struct timespec ts;
2157 etp_worker *self = (etp_worker *)thr_arg; 1741 etp_worker *self = (etp_worker *)thr_arg;
2203 1787
2204 --nready; 1788 --nready;
2205 1789
2206 X_UNLOCK (reqlock); 1790 X_UNLOCK (reqlock);
2207 1791
2208 if (req->type < 0) 1792 if (req->type == ETP_TYPE_QUIT)
2209 goto quit; 1793 goto quit;
2210 1794
2211 ETP_EXECUTE (self, req); 1795 ETP_EXECUTE (self, req);
2212 1796
2213 X_LOCK (reslock); 1797 X_LOCK (reslock);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines