ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/eio.c
(Generate patch)

Comparing libeio/eio.c (file contents):
Revision 1.10 by root, Tue May 13 17:08:15 2008 UTC vs.
Revision 1.13 by root, Tue May 13 19:34:11 2008 UTC

70/* buffer size for various temporary buffers */ 70/* buffer size for various temporary buffers */
71#define EIO_BUFSIZE 65536 71#define EIO_BUFSIZE 65536
72 72
73#define dBUF \ 73#define dBUF \
74 char *eio_buf; \ 74 char *eio_buf; \
75 X_LOCK (wrklock); \ 75 X_LOCK (etplock); \
76 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \ 76 self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
77 X_UNLOCK (wrklock); \ 77 X_UNLOCK (etplock); \
78 errno = ENOMEM; \ 78 errno = ENOMEM; \
79 if (!eio_buf) \ 79 if (!eio_buf) \
80 return -1; 80 return -1;
81 81
82#define EIO_TICKS ((1000000 + 1023) >> 10) 82#define EIO_TICKS ((1000000 + 1023) >> 10)
83 83
84static void (*want_poll_cb) (void); 84/*****************************************************************************/
85static void (*done_poll_cb) (void);
86
87static unsigned int max_poll_time = 0;
88static unsigned int max_poll_reqs = 0;
89 85
90/* calculcate time difference in ~1/EIO_TICKS of a second */ 86/* calculcate time difference in ~1/EIO_TICKS of a second */
91static int tvdiff (struct timeval *tv1, struct timeval *tv2) 87static int tvdiff (struct timeval *tv1, struct timeval *tv2)
92{ 88{
93 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS 89 return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS
94 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 90 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
95} 91}
96 92
97static unsigned int started, idle, wanted = 4; 93static unsigned int started, idle, wanted = 4;
98 94
99/* worker threads management */ 95typedef struct etp_pool
96{
97 void (*want_poll_cb) (void);
98 void (*done_poll_cb) (void);
99
100 unsigned int max_poll_time;
101 unsigned int max_poll_reqs;
102} etp_pool;
103
104static volatile unsigned int nreqs, nready, npending;
105static volatile unsigned int max_idle = 4;
106
107static mutex_t etplock = X_MUTEX_INIT;
100static mutex_t wrklock = X_MUTEX_INIT; 108static mutex_t reslock = X_MUTEX_INIT;
109static mutex_t reqlock = X_MUTEX_INIT;
110static cond_t reqwait = X_COND_INIT;
101 111
102typedef struct worker 112typedef struct worker
103{ 113{
104 /* locked by wrklock */ 114 /* locked by etplock */
105 struct worker *prev, *next; 115 struct worker *prev, *next;
106 116
107 thread_t tid; 117 thread_t tid;
108 118
109 /* locked by reslock, reqlock or wrklock */ 119 /* locked by reslock, reqlock or etplock */
110 eio_req *req; /* currently processed request */ 120 eio_req *req; /* currently processed request */
111 void *dbuf; 121 void *dbuf;
112 DIR *dirp; 122 DIR *dirp;
113} worker; 123} worker;
114 124
115static worker wrk_first = { &wrk_first, &wrk_first, 0 }; 125static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
126
127/* worker threads management */
116 128
117static void worker_clear (worker *wrk) 129static void worker_clear (worker *wrk)
118{ 130{
119 if (wrk->dirp) 131 if (wrk->dirp)
120 { 132 {
134 wrk->next->prev = wrk->prev; 146 wrk->next->prev = wrk->prev;
135 wrk->prev->next = wrk->next; 147 wrk->prev->next = wrk->next;
136 148
137 free (wrk); 149 free (wrk);
138} 150}
139
140static volatile unsigned int nreqs, nready, npending;
141static volatile unsigned int max_idle = 4;
142
143static mutex_t reslock = X_MUTEX_INIT;
144static mutex_t reqlock = X_MUTEX_INIT;
145static cond_t reqwait = X_COND_INIT;
146 151
147unsigned int eio_nreqs (void) 152unsigned int eio_nreqs (void)
148{ 153{
149 return nreqs; 154 return nreqs;
150} 155}
234 } 239 }
235 240
236 abort (); 241 abort ();
237} 242}
238 243
244static void etp_atfork_prepare (void)
245{
246 X_LOCK (etplock);
247 X_LOCK (reqlock);
248 X_LOCK (reslock);
249#if !HAVE_PREADWRITE
250 X_LOCK (preadwritelock);
251#endif
252#if !HAVE_READDIR_R
253 X_LOCK (readdirlock);
254#endif
255}
256
257static void etp_atfork_parent (void)
258{
259#if !HAVE_READDIR_R
260 X_UNLOCK (readdirlock);
261#endif
262#if !HAVE_PREADWRITE
263 X_UNLOCK (preadwritelock);
264#endif
265 X_UNLOCK (reslock);
266 X_UNLOCK (reqlock);
267 X_UNLOCK (etplock);
268}
269
270static void etp_atfork_child (void)
271{
272 eio_req *prv;
273
274 while (prv = reqq_shift (&req_queue))
275 eio_destroy (prv);
276
277 while (prv = reqq_shift (&res_queue))
278 eio_destroy (prv);
279
280 while (wrk_first.next != &wrk_first)
281 {
282 worker *wrk = wrk_first.next;
283
284 if (wrk->req)
285 eio_destroy (wrk->req);
286
287 worker_clear (wrk);
288 worker_free (wrk);
289 }
290
291 started = 0;
292 idle = 0;
293 nreqs = 0;
294 nready = 0;
295 npending = 0;
296
297 etp_atfork_parent ();
298}
299
300static void
301etp_once_init (void)
302{
303 X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
304}
305
306static int
307etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
308{
309 static pthread_once_t doinit = PTHREAD_ONCE_INIT;
310
311 pthread_once (&doinit, etp_once_init);
312
313 memset (etp, 0, sizeof *etp);
314
315 etp->want_poll_cb = want_poll;
316 etp->done_poll_cb = done_poll;
317}
318
319static etp_pool etp;
320
321/*****************************************************************************/
322
239static void grp_try_feed (eio_req *grp) 323static void grp_try_feed (eio_req *grp)
240{ 324{
241 while (grp->size < grp->int2 && !EIO_CANCELLED (grp)) 325 while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
242 { 326 {
243 int old_len = grp->size; 327 int old_len = grp->size;
310 eio_cancel (grp); 394 eio_cancel (grp);
311} 395}
312 396
313void eio_cancel (eio_req *req) 397void eio_cancel (eio_req *req)
314{ 398{
315 X_LOCK (wrklock); 399 X_LOCK (etplock);
316 req->flags |= EIO_FLAG_CANCELLED; 400 req->flags |= EIO_FLAG_CANCELLED;
317 X_UNLOCK (wrklock); 401 X_UNLOCK (etplock);
318 402
319 eio_grp_cancel (req); 403 eio_grp_cancel (req);
320} 404}
321 405
322X_THREAD_PROC (eio_proc); 406X_THREAD_PROC (eio_proc);
326 worker *wrk = calloc (1, sizeof (worker)); 410 worker *wrk = calloc (1, sizeof (worker));
327 411
328 /*TODO*/ 412 /*TODO*/
329 assert (("unable to allocate worker thread data", wrk)); 413 assert (("unable to allocate worker thread data", wrk));
330 414
331 X_LOCK (wrklock); 415 X_LOCK (etplock);
332 416
333 if (thread_create (&wrk->tid, eio_proc, (void *)wrk)) 417 if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
334 { 418 {
335 wrk->prev = &wrk_first; 419 wrk->prev = &wrk_first;
336 wrk->next = wrk_first.next; 420 wrk->next = wrk_first.next;
339 ++started; 423 ++started;
340 } 424 }
341 else 425 else
342 free (wrk); 426 free (wrk);
343 427
344 X_UNLOCK (wrklock); 428 X_UNLOCK (etplock);
345} 429}
346 430
347static void maybe_start_thread (void) 431static void maybe_start_thread (void)
348{ 432{
349 if (eio_nthreads () >= wanted) 433 if (eio_nthreads () >= wanted)
356 start_thread (); 440 start_thread ();
357} 441}
358 442
359void eio_submit (eio_req *req) 443void eio_submit (eio_req *req)
360{ 444{
445 req->pri += EIO_PRI_BIAS;
446
447 if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS;
448 if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
449
361 ++nreqs; 450 ++nreqs;
362 451
363 X_LOCK (reqlock); 452 X_LOCK (reqlock);
364 ++nready; 453 ++nready;
365 reqq_push (&req_queue, req); 454 reqq_push (&req_queue, req);
379 X_LOCK (reqlock); 468 X_LOCK (reqlock);
380 reqq_push (&req_queue, req); 469 reqq_push (&req_queue, req);
381 X_COND_SIGNAL (reqwait); 470 X_COND_SIGNAL (reqwait);
382 X_UNLOCK (reqlock); 471 X_UNLOCK (reqlock);
383 472
384 X_LOCK (wrklock); 473 X_LOCK (etplock);
385 --started; 474 --started;
386 X_UNLOCK (wrklock); 475 X_UNLOCK (etplock);
387} 476}
388 477
389void eio_set_max_poll_time (double nseconds) 478void eio_set_max_poll_time (double nseconds)
390{ 479{
391 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 480 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
392 max_poll_time = nseconds; 481 etp.max_poll_time = nseconds;
393 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 482 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
394} 483}
395 484
396void eio_set_max_poll_reqs (unsigned int maxreqs) 485void eio_set_max_poll_reqs (unsigned int maxreqs)
397{ 486{
398 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 487 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
399 max_poll_reqs = maxreqs; 488 etp.max_poll_reqs = maxreqs;
400 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 489 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
401} 490}
402 491
403void eio_set_max_idle (unsigned int nthreads) 492void eio_set_max_idle (unsigned int nthreads)
404{ 493{
422 end_thread (); 511 end_thread ();
423} 512}
424 513
425int eio_poll (void) 514int eio_poll (void)
426{ 515{
427 int maxreqs = max_poll_reqs; 516 int maxreqs = etp.max_poll_reqs;
428 struct timeval tv_start, tv_now; 517 struct timeval tv_start, tv_now;
429 eio_req *req; 518 eio_req *req;
430 519
431 if (max_poll_time) 520 if (etp.max_poll_time)
432 gettimeofday (&tv_start, 0); 521 gettimeofday (&tv_start, 0);
433 522
434 for (;;) 523 for (;;)
435 { 524 {
436 maybe_start_thread (); 525 maybe_start_thread ();
440 529
441 if (req) 530 if (req)
442 { 531 {
443 --npending; 532 --npending;
444 533
445 if (!res_queue.size && done_poll_cb) 534 if (!res_queue.size && etp.done_poll_cb)
446 done_poll_cb (); 535 etp.done_poll_cb ();
447 } 536 }
448 537
449 X_UNLOCK (reslock); 538 X_UNLOCK (reslock);
450 539
451 if (!req) 540 if (!req)
466 } 555 }
467 556
468 if (maxreqs && !--maxreqs) 557 if (maxreqs && !--maxreqs)
469 break; 558 break;
470 559
471 if (max_poll_time) 560 if (etp.max_poll_time)
472 { 561 {
473 gettimeofday (&tv_now, 0); 562 gettimeofday (&tv_now, 0);
474 563
475 if (tvdiff (&tv_start, &tv_now) >= max_poll_time) 564 if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
476 break; 565 break;
477 } 566 }
478 } 567 }
479 568
480 errno = EAGAIN; 569 errno = EAGAIN;
724 char *name, *names; 813 char *name, *names;
725 int memlen = 4096; 814 int memlen = 4096;
726 int memofs = 0; 815 int memofs = 0;
727 int res = 0; 816 int res = 0;
728 817
729 X_LOCK (wrklock); 818 X_LOCK (etplock);
730 self->dirp = dirp = opendir (req->ptr1); 819 self->dirp = dirp = opendir (req->ptr1);
731 self->dbuf = u = malloc (sizeof (*u)); 820 self->dbuf = u = malloc (sizeof (*u));
732 req->flags |= EIO_FLAG_PTR2_FREE; 821 req->flags |= EIO_FLAG_PTR2_FREE;
733 req->ptr2 = names = malloc (memlen); 822 req->ptr2 = names = malloc (memlen);
734 X_UNLOCK (wrklock); 823 X_UNLOCK (etplock);
735 824
736 if (dirp && u && names) 825 if (dirp && u && names)
737 for (;;) 826 for (;;)
738 { 827 {
739 errno = 0; 828 errno = 0;
751 res++; 840 res++;
752 841
753 while (memofs + len > memlen) 842 while (memofs + len > memlen)
754 { 843 {
755 memlen *= 2; 844 memlen *= 2;
756 X_LOCK (wrklock); 845 X_LOCK (etplock);
757 req->ptr2 = names = realloc (names, memlen); 846 req->ptr2 = names = realloc (names, memlen);
758 X_UNLOCK (wrklock); 847 X_UNLOCK (etplock);
759 848
760 if (!names) 849 if (!names)
761 break; 850 break;
762 } 851 }
763 852
775/*****************************************************************************/ 864/*****************************************************************************/
776 865
777#define ALLOC(len) \ 866#define ALLOC(len) \
778 if (!req->ptr2) \ 867 if (!req->ptr2) \
779 { \ 868 { \
780 X_LOCK (wrklock); \ 869 X_LOCK (etplock); \
781 req->flags |= EIO_FLAG_PTR2_FREE; \ 870 req->flags |= EIO_FLAG_PTR2_FREE; \
782 X_UNLOCK (wrklock); \ 871 X_UNLOCK (etplock); \
783 req->ptr2 = malloc (len); \ 872 req->ptr2 = malloc (len); \
784 if (!req->ptr2) \ 873 if (!req->ptr2) \
785 { \ 874 { \
786 errno = ENOMEM; \ 875 errno = ENOMEM; \
787 req->result = -1; \ 876 req->result = -1; \
817 { 906 {
818 if (idle > max_idle) 907 if (idle > max_idle)
819 { 908 {
820 --idle; 909 --idle;
821 X_UNLOCK (reqlock); 910 X_UNLOCK (reqlock);
822 X_LOCK (wrklock); 911 X_LOCK (etplock);
823 --started; 912 --started;
824 X_UNLOCK (wrklock); 913 X_UNLOCK (etplock);
825 goto quit; 914 goto quit;
826 } 915 }
827 916
828 /* we are allowed to idle, so do so without any timeout */ 917 /* we are allowed to idle, so do so without any timeout */
829 X_COND_WAIT (reqwait, reqlock); 918 X_COND_WAIT (reqwait, reqlock);
943 1032
944 X_LOCK (reslock); 1033 X_LOCK (reslock);
945 1034
946 ++npending; 1035 ++npending;
947 1036
948 if (!reqq_push (&res_queue, req) && want_poll_cb) 1037 if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
949 want_poll_cb (); 1038 etp.want_poll_cb ();
950 1039
951 self->req = 0; 1040 self->req = 0;
952 worker_clear (self); 1041 worker_clear (self);
953 1042
954 X_UNLOCK (reslock); 1043 X_UNLOCK (reslock);
955 } 1044 }
956 1045
957quit: 1046quit:
958 X_LOCK (wrklock); 1047 X_LOCK (etplock);
959 worker_free (self); 1048 worker_free (self);
960 X_UNLOCK (wrklock); 1049 X_UNLOCK (etplock);
961 1050
962 return 0; 1051 return 0;
963} 1052}
964 1053
965/*****************************************************************************/ 1054/*****************************************************************************/
966 1055
967static void eio_atfork_prepare (void)
968{
969 X_LOCK (wrklock);
970 X_LOCK (reqlock);
971 X_LOCK (reslock);
972#if !HAVE_PREADWRITE
973 X_LOCK (preadwritelock);
974#endif
975#if !HAVE_READDIR_R
976 X_LOCK (readdirlock);
977#endif
978}
979
980static void eio_atfork_parent (void)
981{
982#if !HAVE_READDIR_R
983 X_UNLOCK (readdirlock);
984#endif
985#if !HAVE_PREADWRITE
986 X_UNLOCK (preadwritelock);
987#endif
988 X_UNLOCK (reslock);
989 X_UNLOCK (reqlock);
990 X_UNLOCK (wrklock);
991}
992
993static void eio_atfork_child (void)
994{
995 eio_req *prv;
996
997 while (prv = reqq_shift (&req_queue))
998 eio_destroy (prv);
999
1000 while (prv = reqq_shift (&res_queue))
1001 eio_destroy (prv);
1002
1003 while (wrk_first.next != &wrk_first)
1004 {
1005 worker *wrk = wrk_first.next;
1006
1007 if (wrk->req)
1008 eio_destroy (wrk->req);
1009
1010 worker_clear (wrk);
1011 worker_free (wrk);
1012 }
1013
1014 started = 0;
1015 idle = 0;
1016 nreqs = 0;
1017 nready = 0;
1018 npending = 0;
1019
1020 eio_atfork_parent ();
1021}
1022
1023int eio_init (void (*want_poll)(void), void (*done_poll)(void)) 1056int eio_init (void (*want_poll)(void), void (*done_poll)(void))
1024{ 1057{
1025 want_poll_cb = want_poll; 1058 etp_init (&etp, want_poll, done_poll);
1026 done_poll_cb = done_poll;
1027
1028#ifdef _WIN32
1029 X_MUTEX_CHECK (wrklock);
1030 X_MUTEX_CHECK (reslock);
1031 X_MUTEX_CHECK (reqlock);
1032 X_MUTEX_CHECK (reqwait);
1033 X_MUTEX_CHECK (preadwritelock);
1034 X_MUTEX_CHECK (readdirlock);
1035
1036 X_COND_CHECK (reqwait);
1037#endif
1038
1039 X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
1040} 1059}
1041 1060
1042static void eio_api_destroy (eio_req *req) 1061static void eio_api_destroy (eio_req *req)
1043{ 1062{
1044 free (req); 1063 free (req);
1049 \ 1068 \
1050 req = (eio_req *)calloc (1, sizeof *req); \ 1069 req = (eio_req *)calloc (1, sizeof *req); \
1051 if (!req) \ 1070 if (!req) \
1052 return 0; \ 1071 return 0; \
1053 \ 1072 \
1054 req->type = rtype; \ 1073 req->type = rtype; \
1055 req->pri = EIO_DEFAULT_PRI + EIO_PRI_BIAS; \ 1074 req->pri = pri; \
1056 req->finish = cb; \ 1075 req->finish = cb; \
1076 req->data = data; \
1057 req->destroy = eio_api_destroy; 1077 req->destroy = eio_api_destroy;
1058 1078
1059#define SEND eio_submit (req); return req 1079#define SEND eio_submit (req); return req
1060 1080
1061#define PATH \ 1081#define PATH \
1065 { \ 1085 { \
1066 eio_api_destroy (req); \ 1086 eio_api_destroy (req); \
1067 return 0; \ 1087 return 0; \
1068 } 1088 }
1069 1089
1090#ifndef EIO_NO_WRAPPERS
1091
1070eio_req *eio_nop (int pri, eio_cb cb, void *data) 1092eio_req *eio_nop (int pri, eio_cb cb, void *data)
1071{ 1093{
1072 REQ (EIO_NOP); SEND; 1094 REQ (EIO_NOP); SEND;
1073} 1095}
1074 1096
1247eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data) 1269eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data)
1248{ 1270{
1249 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data); 1271 return eio__2path (EIO_RENAME, path, new_path, pri, cb, data);
1250} 1272}
1251 1273
1274#endif
1275
1252eio_req *eio_grp (eio_cb cb, void *data) 1276eio_req *eio_grp (eio_cb cb, void *data)
1253{ 1277{
1254 const int pri = EIO_PRI_MAX; 1278 const int pri = EIO_PRI_MAX;
1255 1279
1256 REQ (EIO_GROUP); SEND; 1280 REQ (EIO_GROUP); SEND;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines