ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
(Generate patch)

Comparing libeio/eio.c (file contents):
Revision 1.12 by root, Tue May 13 18:54:52 2008 UTC vs.
Revision 1.13 by root, Tue May 13 19:34:11 2008 UTC

70/* buffer size for various temporary buffers */ 70/* buffer size for various temporary buffers */
71#define EIO_BUFSIZE 65536 71#define EIO_BUFSIZE 65536
72 72
73#define dBUF \ 73#define dBUF \
74 char *eio_buf; \ 74 char *eio_buf; \
75 X_LOCK (wrklock); \ 75 X_LOCK (etplock); \
76 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \ 76 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77 X_UNLOCK (wrklock); \ 77 X_UNLOCK (etplock); \
78 errno = ENOMEM; \ 78 errno = ENOMEM; \
79 if (!eio_buf) \ 79 if (!eio_buf) \
80 return -1; 80 return -1;
81 81
82#define EIO_TICKS ((1000000 + 1023) >> 10) 82#define EIO_TICKS ((1000000 + 1023) >> 10)
83 83
84static void (*want_poll_cb) (void); 84/*****************************************************************************/
85static void (*done_poll_cb) (void);
86
87static unsigned int max_poll_time = 0;
88static unsigned int max_poll_reqs = 0;
89 85
90/* calculcate time difference in ~1/EIO_TICKS of a second */ 86/* calculcate time difference in ~1/EIO_TICKS of a second */
91static int tvdiff (struct timeval *tv1, struct timeval *tv2) 87static int tvdiff (struct timeval *tv1, struct timeval *tv2)
92{ 88{
93 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS 89 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
94 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 90 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
95} 91}
96 92
97static unsigned int started, idle, wanted = 4; 93static unsigned int started, idle, wanted = 4;
98 94
99/* worker threads management */ 95typedef struct etp_pool
96{
97 void (*want_poll_cb) (void);
98 void (*done_poll_cb) (void);
99
100 unsigned int max_poll_time;
101 unsigned int max_poll_reqs;
102} etp_pool;
103
104static volatile unsigned int nreqs, nready, npending;
105static volatile unsigned int max_idle = 4;
106
107static mutex_t etplock = X_MUTEX_INIT;
100static mutex_t wrklock = X_MUTEX_INIT; 108static mutex_t reslock = X_MUTEX_INIT;
109static mutex_t reqlock = X_MUTEX_INIT;
110static cond_t reqwait = X_COND_INIT;
101 111
102typedef struct worker 112typedef struct worker
103{ 113{
104 /* locked by wrklock */ 114 /* locked by etplock */
105 struct worker *prev, *next; 115 struct worker *prev, *next;
106 116
107 thread_t tid; 117 thread_t tid;
108 118
109 /* locked by reslock, reqlock or wrklock */ 119 /* locked by reslock, reqlock or etplock */
110 eio_req *req; /* currently processed request */ 120 eio_req *req; /* currently processed request */
111 void *dbuf; 121 void *dbuf;
112 DIR *dirp; 122 DIR *dirp;
113} worker; 123} worker;
114 124
115static worker wrk_first = { &wrk_first, &wrk_first, 0 }; 125static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
126
127/* worker threads management */
116 128
117static void worker_clear (worker *wrk) 129static void worker_clear (worker *wrk)
118{ 130{
119 if (wrk->dirp) 131 if (wrk->dirp)
120 { 132 {
134 wrk->next->prev = wrk->prev; 146 wrk->next->prev = wrk->prev;
135 wrk->prev->next = wrk->next; 147 wrk->prev->next = wrk->next;
136 148
137 free (wrk); 149 free (wrk);
138} 150}
139
140static volatile unsigned int nreqs, nready, npending;
141static volatile unsigned int max_idle = 4;
142
143static mutex_t reslock = X_MUTEX_INIT;
144static mutex_t reqlock = X_MUTEX_INIT;
145static cond_t reqwait = X_COND_INIT;
146 151
147unsigned int eio_nreqs (void) 152unsigned int eio_nreqs (void)
148{ 153{
149 return nreqs; 154 return nreqs;
150} 155}
234 } 239 }
235 240
236 abort (); 241 abort ();
237} 242}
238 243
244static void etp_atfork_prepare (void)
245{
246 X_LOCK (etplock);
247 X_LOCK (reqlock);
248 X_LOCK (reslock);
249#if !HAVE_PREADWRITE
250 X_LOCK (preadwritelock);
251#endif
252#if !HAVE_READDIR_R
253 X_LOCK (readdirlock);
254#endif
255}
256
257static void etp_atfork_parent (void)
258{
259#if !HAVE_READDIR_R
260 X_UNLOCK (readdirlock);
261#endif
262#if !HAVE_PREADWRITE
263 X_UNLOCK (preadwritelock);
264#endif
265 X_UNLOCK (reslock);
266 X_UNLOCK (reqlock);
267 X_UNLOCK (etplock);
268}
269
270static void etp_atfork_child (void)
271{
272 eio_req *prv;
273
274 while (prv = reqq_shift (&req_queue))
275 eio_destroy (prv);
276
277 while (prv = reqq_shift (&res_queue))
278 eio_destroy (prv);
279
280 while (wrk_first.next != &wrk_first)
281 {
282 worker *wrk = wrk_first.next;
283
284 if (wrk->req)
285 eio_destroy (wrk->req);
286
287 worker_clear (wrk);
288 worker_free (wrk);
289 }
290
291 started = 0;
292 idle = 0;
293 nreqs = 0;
294 nready = 0;
295 npending = 0;
296
297 etp_atfork_parent ();
298}
299
300static void
301etp_once_init (void)
302{
303 X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
304}
305
306static int
307etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
308{
309 static pthread_once_t doinit = PTHREAD_ONCE_INIT;
310
311 pthread_once (&doinit, etp_once_init);
312
313 memset (etp, 0, sizeof *etp);
314
315 etp->want_poll_cb = want_poll;
316 etp->done_poll_cb = done_poll;
317}
318
319static etp_pool etp;
320
321/*****************************************************************************/
322
239static void grp_try_feed (eio_req *grp) 323static void grp_try_feed (eio_req *grp)
240{ 324{
241 while (grp->size < grp->int2 && !EIO_CANCELLED (grp)) 325 while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
242 { 326 {
243 int old_len = grp->size; 327 int old_len = grp->size;
310 eio_cancel (grp); 394 eio_cancel (grp);
311} 395}
312 396
313void eio_cancel (eio_req *req) 397void eio_cancel (eio_req *req)
314{ 398{
315 X_LOCK (wrklock); 399 X_LOCK (etplock);
316 req->flags |= EIO_FLAG_CANCELLED; 400 req->flags |= EIO_FLAG_CANCELLED;
317 X_UNLOCK (wrklock); 401 X_UNLOCK (etplock);
318 402
319 eio_grp_cancel (req); 403 eio_grp_cancel (req);
320} 404}
321 405
322X_THREAD_PROC (eio_proc); 406X_THREAD_PROC (eio_proc);
326 worker *wrk = calloc (1, sizeof (worker)); 410 worker *wrk = calloc (1, sizeof (worker));
327 411
328 /*TODO*/ 412 /*TODO*/
329 assert (("unable to allocate worker thread data", wrk)); 413 assert (("unable to allocate worker thread data", wrk));
330 414
331 X_LOCK (wrklock); 415 X_LOCK (etplock);
332 416
333 if (thread_create (&wrk->tid, eio_proc, (void *)wrk)) 417 if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
334 { 418 {
335 wrk->prev = &wrk_first; 419 wrk->prev = &wrk_first;
336 wrk->next = wrk_first.next; 420 wrk->next = wrk_first.next;
339 ++started; 423 ++started;
340 } 424 }
341 else 425 else
342 free (wrk); 426 free (wrk);
343 427
344 X_UNLOCK (wrklock); 428 X_UNLOCK (etplock);
345} 429}
346 430
347static void maybe_start_thread (void) 431static void maybe_start_thread (void)
348{ 432{
349 if (eio_nthreads () >= wanted) 433 if (eio_nthreads () >= wanted)
384 X_LOCK (reqlock); 468 X_LOCK (reqlock);
385 reqq_push (&req_queue, req); 469 reqq_push (&req_queue, req);
386 X_COND_SIGNAL (reqwait); 470 X_COND_SIGNAL (reqwait);
387 X_UNLOCK (reqlock); 471 X_UNLOCK (reqlock);
388 472
389 X_LOCK (wrklock); 473 X_LOCK (etplock);
390 --started; 474 --started;
391 X_UNLOCK (wrklock); 475 X_UNLOCK (etplock);
392} 476}
393 477
394void eio_set_max_poll_time (double nseconds) 478void eio_set_max_poll_time (double nseconds)
395{ 479{
396 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 480 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
397 max_poll_time = nseconds; 481 etp.max_poll_time = nseconds;
398 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 482 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
399} 483}
400 484
401void eio_set_max_poll_reqs (unsigned int maxreqs) 485void eio_set_max_poll_reqs (unsigned int maxreqs)
402{ 486{
403 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 487 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
404 max_poll_reqs = maxreqs; 488 etp.max_poll_reqs = maxreqs;
405 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 489 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
406} 490}
407 491
408void eio_set_max_idle (unsigned int nthreads) 492void eio_set_max_idle (unsigned int nthreads)
409{ 493{
427 end_thread (); 511 end_thread ();
428} 512}
429 513
430int eio_poll (void) 514int eio_poll (void)
431{ 515{
432 int maxreqs = max_poll_reqs; 516 int maxreqs = etp.max_poll_reqs;
433 struct timeval tv_start, tv_now; 517 struct timeval tv_start, tv_now;
434 eio_req *req; 518 eio_req *req;
435 519
436 if (max_poll_time) 520 if (etp.max_poll_time)
437 gettimeofday (&tv_start, 0); 521 gettimeofday (&tv_start, 0);
438 522
439 for (;;) 523 for (;;)
440 { 524 {
441 maybe_start_thread (); 525 maybe_start_thread ();
445 529
446 if (req) 530 if (req)
447 { 531 {
448 --npending; 532 --npending;
449 533
450 if (!res_queue.size && done_poll_cb) 534 if (!res_queue.size && etp.done_poll_cb)
451 done_poll_cb (); 535 etp.done_poll_cb ();
452 } 536 }
453 537
454 X_UNLOCK (reslock); 538 X_UNLOCK (reslock);
455 539
456 if (!req) 540 if (!req)
471 } 555 }
472 556
473 if (maxreqs && !--maxreqs) 557 if (maxreqs && !--maxreqs)
474 break; 558 break;
475 559
476 if (max_poll_time) 560 if (etp.max_poll_time)
477 { 561 {
478 gettimeofday (&tv_now, 0); 562 gettimeofday (&tv_now, 0);
479 563
480 if (tvdiff (&tv_start, &tv_now) >= max_poll_time) 564 if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
481 break; 565 break;
482 } 566 }
483 } 567 }
484 568
485 errno = EAGAIN; 569 errno = EAGAIN;
729 char *name, *names; 813 char *name, *names;
730 int memlen = 4096; 814 int memlen = 4096;
731 int memofs = 0; 815 int memofs = 0;
732 int res = 0; 816 int res = 0;
733 817
734 X_LOCK (wrklock); 818 X_LOCK (etplock);
735 self->dirp = dirp = opendir (req->ptr1); 819 self->dirp = dirp = opendir (req->ptr1);
736 self->dbuf = u = malloc (sizeof (*u)); 820 self->dbuf = u = malloc (sizeof (*u));
737 req->flags |= EIO_FLAG_PTR2_FREE; 821 req->flags |= EIO_FLAG_PTR2_FREE;
738 req->ptr2 = names = malloc (memlen); 822 req->ptr2 = names = malloc (memlen);
739 X_UNLOCK (wrklock); 823 X_UNLOCK (etplock);
740 824
741 if (dirp && u && names) 825 if (dirp && u && names)
742 for (;;) 826 for (;;)
743 { 827 {
744 errno = 0; 828 errno = 0;
756 res++; 840 res++;
757 841
758 while (memofs + len > memlen) 842 while (memofs + len > memlen)
759 { 843 {
760 memlen *= 2; 844 memlen *= 2;
761 X_LOCK (wrklock); 845 X_LOCK (etplock);
762 req->ptr2 = names = realloc (names, memlen); 846 req->ptr2 = names = realloc (names, memlen);
763 X_UNLOCK (wrklock); 847 X_UNLOCK (etplock);
764 848
765 if (!names) 849 if (!names)
766 break; 850 break;
767 } 851 }
768 852
780/*****************************************************************************/ 864/*****************************************************************************/
781 865
782#define ALLOC(len) \ 866#define ALLOC(len) \
783 if (!req->ptr2) \ 867 if (!req->ptr2) \
784 { \ 868 { \
785 X_LOCK (wrklock); \ 869 X_LOCK (etplock); \
786 req->flags |= EIO_FLAG_PTR2_FREE; \ 870 req->flags |= EIO_FLAG_PTR2_FREE; \
787 X_UNLOCK (wrklock); \ 871 X_UNLOCK (etplock); \
788 req->ptr2 = malloc (len); \ 872 req->ptr2 = malloc (len); \
789 if (!req->ptr2) \ 873 if (!req->ptr2) \
790 { \ 874 { \
791 errno = ENOMEM; \ 875 errno = ENOMEM; \
792 req->result = -1; \ 876 req->result = -1; \
822 { 906 {
823 if (idle > max_idle) 907 if (idle > max_idle)
824 { 908 {
825 --idle; 909 --idle;
826 X_UNLOCK (reqlock); 910 X_UNLOCK (reqlock);
827 X_LOCK (wrklock); 911 X_LOCK (etplock);
828 --started; 912 --started;
829 X_UNLOCK (wrklock); 913 X_UNLOCK (etplock);
830 goto quit; 914 goto quit;
831 } 915 }
832 916
833 /* we are allowed to idle, so do so without any timeout */ 917 /* we are allowed to idle, so do so without any timeout */
834 X_COND_WAIT (reqwait, reqlock); 918 X_COND_WAIT (reqwait, reqlock);
948 1032
949 X_LOCK (reslock); 1033 X_LOCK (reslock);
950 1034
951 ++npending; 1035 ++npending;
952 1036
953 if (!reqq_push (&res_queue, req) && want_poll_cb) 1037 if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
954 want_poll_cb (); 1038 etp.want_poll_cb ();
955 1039
956 self->req = 0; 1040 self->req = 0;
957 worker_clear (self); 1041 worker_clear (self);
958 1042
959 X_UNLOCK (reslock); 1043 X_UNLOCK (reslock);
960 } 1044 }
961 1045
962quit: 1046quit:
963 X_LOCK (wrklock); 1047 X_LOCK (etplock);
964 worker_free (self); 1048 worker_free (self);
965 X_UNLOCK (wrklock); 1049 X_UNLOCK (etplock);
966 1050
967 return 0; 1051 return 0;
968} 1052}
969 1053
970/*****************************************************************************/ 1054/*****************************************************************************/
971 1055
972static void eio_atfork_prepare (void)
973{
974 X_LOCK (wrklock);
975 X_LOCK (reqlock);
976 X_LOCK (reslock);
977#if !HAVE_PREADWRITE
978 X_LOCK (preadwritelock);
979#endif
980#if !HAVE_READDIR_R
981 X_LOCK (readdirlock);
982#endif
983}
984
985static void eio_atfork_parent (void)
986{
987#if !HAVE_READDIR_R
988 X_UNLOCK (readdirlock);
989#endif
990#if !HAVE_PREADWRITE
991 X_UNLOCK (preadwritelock);
992#endif
993 X_UNLOCK (reslock);
994 X_UNLOCK (reqlock);
995 X_UNLOCK (wrklock);
996}
997
998static void eio_atfork_child (void)
999{
1000 eio_req *prv;
1001
1002 while (prv = reqq_shift (&req_queue))
1003 eio_destroy (prv);
1004
1005 while (prv = reqq_shift (&res_queue))
1006 eio_destroy (prv);
1007
1008 while (wrk_first.next != &wrk_first)
1009 {
1010 worker *wrk = wrk_first.next;
1011
1012 if (wrk->req)
1013 eio_destroy (wrk->req);
1014
1015 worker_clear (wrk);
1016 worker_free (wrk);
1017 }
1018
1019 started = 0;
1020 idle = 0;
1021 nreqs = 0;
1022 nready = 0;
1023 npending = 0;
1024
1025 eio_atfork_parent ();
1026}
1027
1028int eio_init (void (*want_poll)(void), void (*done_poll)(void)) 1056int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1029{ 1057{
1030 want_poll_cb = want_poll; 1058 etp_init (&etp, want_poll, done_poll);
1031 done_poll_cb = done_poll;
1032
1033#ifdef _WIN32
1034 X_MUTEX_CHECK (wrklock);
1035 X_MUTEX_CHECK (reslock);
1036 X_MUTEX_CHECK (reqlock);
1037 X_MUTEX_CHECK (reqwait);
1038 X_MUTEX_CHECK (preadwritelock);
1039 X_MUTEX_CHECK (readdirlock);
1040
1041 X_COND_CHECK (reqwait);
1042#endif
1043
1044 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1045} 1059}
1046 1060
1047static void eio_api_destroy (eio_req *req) 1061static void eio_api_destroy (eio_req *req)
1048{ 1062{
1049 free (req); 1063 free (req);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines