ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.5 by root, Thu Jun 25 17:40:24 2015 UTC vs.
Revision 1.6 by root, Thu Jun 25 18:08:47 2015 UTC

52 52
53#ifndef ETP_TYPE_GROUP 53#ifndef ETP_TYPE_GROUP
54# define ETP_TYPE_GROUP 1 54# define ETP_TYPE_GROUP 1
55#endif 55#endif
56 56
57#ifndef ETP_WANT_POLL
58# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59#endif
60#ifndef ETP_DONE_POLL
61# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62#endif
63
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 64#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 65
59#define ETP_TICKS ((1000000 + 1023) >> 10) 66#define ETP_TICKS ((1000000 + 1023) >> 10)
60 67
61enum { 68enum {
68etp_tvdiff (struct timeval *tv1, struct timeval *tv2) 75etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
69{ 76{
70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 77 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 78 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72} 79}
73
74static unsigned int started, idle, wanted = 4;
75
76static void (*want_poll_cb) (void);
77static void (*done_poll_cb) (void);
78
79static unsigned int max_poll_time; /* reslock */
80static unsigned int max_poll_reqs; /* reslock */
81
82static unsigned int nreqs; /* reqlock */
83static unsigned int nready; /* reqlock */
84static unsigned int npending; /* reqlock */
85static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87
88static xmutex_t wrklock;
89static xmutex_t reslock;
90static xmutex_t reqlock;
91static xcond_t reqwait;
92 80
93struct etp_tmpbuf 81struct etp_tmpbuf
94{ 82{
95 void *ptr; 83 void *ptr;
96 int len; 84 int len;
119 int size; 107 int size;
120} etp_reqq; 108} etp_reqq;
121 109
122struct etp_pool 110struct etp_pool
123{ 111{
112 void *userdata;
113
124 etp_reqq req_queue; 114 etp_reqq req_queue;
125 etp_reqq res_queue; 115 etp_reqq res_queue;
116
117 unsigned int started, idle, wanted;
118
119 unsigned int max_poll_time; /* pool->reslock */
120 unsigned int max_poll_reqs; /* pool->reslock */
121
122 unsigned int nreqs; /* pool->reqlock */
123 unsigned int nready; /* pool->reqlock */
124 unsigned int npending; /* pool->reqlock */
125 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
126 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
127
128 void (*want_poll_cb) (void *userdata);
129 void (*done_poll_cb) (void *userdata);
130
131 xmutex_t wrklock;
132 xmutex_t reslock;
133 xmutex_t reqlock;
134 xcond_t reqwait;
126}; 135};
127 136
128typedef struct etp_pool *etp_pool; 137typedef struct etp_pool *etp_pool;
129 138
130typedef struct etp_worker 139typedef struct etp_worker
131{ 140{
132 etp_pool pool; 141 etp_pool pool;
133 142
134 struct etp_tmpbuf tmpbuf; 143 struct etp_tmpbuf tmpbuf;
135 144
136 /* locked by wrklock */ 145 /* locked by pool->wrklock */
137 struct etp_worker *prev, *next; 146 struct etp_worker *prev, *next;
138 147
139 xthread_t tid; 148 xthread_t tid;
140 149
141#ifdef ETP_WORKER_COMMON 150#ifdef ETP_WORKER_COMMON
143#endif 152#endif
144} etp_worker; 153} etp_worker;
145 154
146static etp_worker wrk_first; /* NOT etp */ 155static etp_worker wrk_first; /* NOT etp */
147 156
148#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock) 157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
149#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock) 158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
150 159
151/* worker threads management */ 160/* worker threads management */
152 161
153static void 162static void
154etp_worker_clear (etp_worker *wrk) 163etp_worker_clear (etp_worker *wrk)
168 177
169ETP_API_DECL unsigned int 178ETP_API_DECL unsigned int
170etp_nreqs (etp_pool pool) 179etp_nreqs (etp_pool pool)
171{ 180{
172 int retval; 181 int retval;
173 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 182 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
174 retval = nreqs; 183 retval = pool->nreqs;
175 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 184 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
176 return retval; 185 return retval;
177} 186}
178 187
179ETP_API_DECL unsigned int 188ETP_API_DECL unsigned int
180etp_nready (etp_pool pool) 189etp_nready (etp_pool pool)
181{ 190{
182 unsigned int retval; 191 unsigned int retval;
183 192
184 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 193 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
185 retval = nready; 194 retval = pool->nready;
186 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 195 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
187 196
188 return retval; 197 return retval;
189} 198}
190 199
191ETP_API_DECL unsigned int 200ETP_API_DECL unsigned int
192etp_npending (etp_pool pool) 201etp_npending (etp_pool pool)
193{ 202{
194 unsigned int retval; 203 unsigned int retval;
195 204
196 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 205 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
197 retval = npending; 206 retval = pool->npending;
198 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 207 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
199 208
200 return retval; 209 return retval;
201} 210}
202 211
203ETP_API_DECL unsigned int 212ETP_API_DECL unsigned int
204etp_nthreads (etp_pool pool) 213etp_nthreads (etp_pool pool)
205{ 214{
206 unsigned int retval; 215 unsigned int retval;
207 216
208 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 217 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
209 retval = started; 218 retval = pool->started;
210 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 219 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
211 220
212 return retval; 221 return retval;
213} 222}
214 223
215static void ecb_noinline ecb_cold 224static void ecb_noinline ecb_cold
265 274
266 abort (); 275 abort ();
267} 276}
268 277
269ETP_API_DECL int ecb_cold 278ETP_API_DECL int ecb_cold
270etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void)) 279etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
271{ 280{
272 X_MUTEX_CREATE (wrklock); 281 X_MUTEX_CREATE (pool->wrklock);
273 X_MUTEX_CREATE (reslock); 282 X_MUTEX_CREATE (pool->reslock);
274 X_MUTEX_CREATE (reqlock); 283 X_MUTEX_CREATE (pool->reqlock);
275 X_COND_CREATE (reqwait); 284 X_COND_CREATE (pool->reqwait);
276 285
277 reqq_init (&pool->req_queue); 286 reqq_init (&pool->req_queue);
278 reqq_init (&pool->res_queue); 287 reqq_init (&pool->res_queue);
279 288
280 wrk_first.next = 289 wrk_first.next =
281 wrk_first.prev = &wrk_first; 290 wrk_first.prev = &wrk_first;
282 291
283 started = 0; 292 pool->started = 0;
284 idle = 0; 293 pool->idle = 0;
285 nreqs = 0; 294 pool->nreqs = 0;
286 nready = 0; 295 pool->nready = 0;
287 npending = 0; 296 pool->npending = 0;
297 pool->wanted = 4;
288 298
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301
289 want_poll_cb = want_poll; 302 pool->want_poll_cb = want_poll;
290 done_poll_cb = done_poll; 303 pool->done_poll_cb = done_poll;
291 304
292 return 0; 305 return 0;
293} 306}
294 307
295static void ecb_noinline ecb_cold 308static void ecb_noinline ecb_cold
323 336
324 for (;;) 337 for (;;)
325 { 338 {
326 ts.tv_sec = 0; 339 ts.tv_sec = 0;
327 340
328 X_LOCK (reqlock); 341 X_LOCK (pool->reqlock);
329 342
330 for (;;) 343 for (;;)
331 { 344 {
332 req = reqq_shift (&pool->req_queue); 345 req = reqq_shift (&pool->req_queue);
333 346
334 if (req) 347 if (req)
335 break; 348 break;
336 349
337 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 350 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
338 { 351 {
339 X_UNLOCK (reqlock); 352 X_UNLOCK (pool->reqlock);
340 X_LOCK (wrklock); 353 X_LOCK (pool->wrklock);
341 --started; 354 --pool->started;
342 X_UNLOCK (wrklock); 355 X_UNLOCK (pool->wrklock);
343 goto quit; 356 goto quit;
344 } 357 }
345 358
346 ++idle; 359 ++pool->idle;
347 360
348 if (idle <= max_idle) 361 if (pool->idle <= pool->max_idle)
349 /* we are allowed to idle, so do so without any timeout */ 362 /* we are allowed to pool->idle, so do so without any timeout */
350 X_COND_WAIT (reqwait, reqlock); 363 X_COND_WAIT (pool->reqwait, pool->reqlock);
351 else 364 else
352 { 365 {
353 /* initialise timeout once */ 366 /* initialise timeout once */
354 if (!ts.tv_sec) 367 if (!ts.tv_sec)
355 ts.tv_sec = time (0) + idle_timeout; 368 ts.tv_sec = time (0) + pool->idle_timeout;
356 369
357 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) 370 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
358 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ 371 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
359 } 372 }
360 373
361 --idle; 374 --pool->idle;
362 } 375 }
363 376
364 --nready; 377 --pool->nready;
365 378
366 X_UNLOCK (reqlock); 379 X_UNLOCK (pool->reqlock);
367 380
368 if (req->type == ETP_TYPE_QUIT) 381 if (req->type == ETP_TYPE_QUIT)
369 goto quit; 382 goto quit;
370 383
371 ETP_EXECUTE (self, req); 384 ETP_EXECUTE (self, req);
372 385
373 X_LOCK (reslock); 386 X_LOCK (pool->reslock);
374 387
375 ++npending; 388 ++pool->npending;
376 389
377 if (!reqq_push (&pool->res_queue, req) && want_poll_cb) 390 if (!reqq_push (&pool->res_queue, req))
378 want_poll_cb (); 391 ETP_WANT_POLL (poll);
379 392
380 etp_worker_clear (self); 393 etp_worker_clear (self);
381 394
382 X_UNLOCK (reslock); 395 X_UNLOCK (pool->reslock);
383 } 396 }
384 397
385quit: 398quit:
386 free (req); 399 free (req);
387 400
388 X_LOCK (wrklock); 401 X_LOCK (pool->wrklock);
389 etp_worker_free (self); 402 etp_worker_free (self);
390 X_UNLOCK (wrklock); 403 X_UNLOCK (pool->wrklock);
391 404
392 return 0; 405 return 0;
393} 406}
394 407
395static void ecb_cold 408static void ecb_cold
400 /*TODO*/ 413 /*TODO*/
401 assert (("unable to allocate worker thread data", wrk)); 414 assert (("unable to allocate worker thread data", wrk));
402 415
403 wrk->pool = pool; 416 wrk->pool = pool;
404 417
405 X_LOCK (wrklock); 418 X_LOCK (pool->wrklock);
406 419
407 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 420 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
408 { 421 {
409 wrk->prev = &wrk_first; 422 wrk->prev = &wrk_first;
410 wrk->next = wrk_first.next; 423 wrk->next = wrk_first.next;
411 wrk_first.next->prev = wrk; 424 wrk_first.next->prev = wrk;
412 wrk_first.next = wrk; 425 wrk_first.next = wrk;
413 ++started; 426 ++pool->started;
414 } 427 }
415 else 428 else
416 free (wrk); 429 free (wrk);
417 430
418 X_UNLOCK (wrklock); 431 X_UNLOCK (pool->wrklock);
419} 432}
420 433
421static void 434static void
422etp_maybe_start_thread (etp_pool pool) 435etp_maybe_start_thread (etp_pool pool)
423{ 436{
424 if (ecb_expect_true (etp_nthreads (pool) >= wanted)) 437 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
425 return; 438 return;
426 439
427 /* todo: maybe use idle here, but might be less exact */ 440 /* todo: maybe use pool->idle here, but might be less exact */
428 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) 441 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
429 return; 442 return;
430 443
431 etp_start_thread (pool); 444 etp_start_thread (pool);
432} 445}
437 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 450 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
438 451
439 req->type = ETP_TYPE_QUIT; 452 req->type = ETP_TYPE_QUIT;
440 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 453 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
441 454
442 X_LOCK (reqlock); 455 X_LOCK (pool->reqlock);
443 reqq_push (&pool->req_queue, req); 456 reqq_push (&pool->req_queue, req);
444 X_COND_SIGNAL (reqwait); 457 X_COND_SIGNAL (pool->reqwait);
445 X_UNLOCK (reqlock); 458 X_UNLOCK (pool->reqlock);
446 459
447 X_LOCK (wrklock); 460 X_LOCK (pool->wrklock);
448 --started; 461 --pool->started;
449 X_UNLOCK (wrklock); 462 X_UNLOCK (pool->wrklock);
450} 463}
451 464
452ETP_API_DECL int 465ETP_API_DECL int
453etp_poll (etp_pool pool) 466etp_poll (etp_pool pool)
454{ 467{
455 unsigned int maxreqs; 468 unsigned int maxreqs;
456 unsigned int maxtime; 469 unsigned int maxtime;
457 struct timeval tv_start, tv_now; 470 struct timeval tv_start, tv_now;
458 471
459 X_LOCK (reslock); 472 X_LOCK (pool->reslock);
460 maxreqs = max_poll_reqs; 473 maxreqs = pool->max_poll_reqs;
461 maxtime = max_poll_time; 474 maxtime = pool->max_poll_time;
462 X_UNLOCK (reslock); 475 X_UNLOCK (pool->reslock);
463 476
464 if (maxtime) 477 if (maxtime)
465 gettimeofday (&tv_start, 0); 478 gettimeofday (&tv_start, 0);
466 479
467 for (;;) 480 for (;;)
468 { 481 {
469 ETP_REQ *req; 482 ETP_REQ *req;
470 483
471 etp_maybe_start_thread (pool); 484 etp_maybe_start_thread (pool);
472 485
473 X_LOCK (reslock); 486 X_LOCK (pool->reslock);
474 req = reqq_shift (&pool->res_queue); 487 req = reqq_shift (&pool->res_queue);
475 488
476 if (req) 489 if (req)
477 { 490 {
478 --npending; 491 --pool->npending;
479 492
480 if (!pool->res_queue.size && done_poll_cb) 493 if (!pool->res_queue.size)
481 done_poll_cb (); 494 ETP_DONE_POLL (pool->userdata);
482 } 495 }
483 496
484 X_UNLOCK (reslock); 497 X_UNLOCK (pool->reslock);
485 498
486 if (!req) 499 if (!req)
487 return 0; 500 return 0;
488 501
489 X_LOCK (reqlock); 502 X_LOCK (pool->reqlock);
490 --nreqs; 503 --pool->nreqs;
491 X_UNLOCK (reqlock); 504 X_UNLOCK (pool->reqlock);
492 505
493 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 506 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
494 { 507 {
495 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ 508 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
496 continue; 509 continue;
545 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 558 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
546 559
547 if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) 560 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
548 { 561 {
549 /* I hope this is worth it :/ */ 562 /* I hope this is worth it :/ */
550 X_LOCK (reqlock); 563 X_LOCK (pool->reqlock);
551 ++nreqs; 564 ++pool->nreqs;
552 X_UNLOCK (reqlock); 565 X_UNLOCK (pool->reqlock);
553 566
554 X_LOCK (reslock); 567 X_LOCK (pool->reslock);
555 568
556 ++npending; 569 ++pool->npending;
557 570
558 if (!reqq_push (&pool->res_queue, req) && want_poll_cb) 571 if (!reqq_push (&pool->res_queue, req))
559 want_poll_cb (); 572 ETP_WANT_POLL (pool);
560 573
561 X_UNLOCK (reslock); 574 X_UNLOCK (pool->reslock);
562 } 575 }
563 else 576 else
564 { 577 {
565 X_LOCK (reqlock); 578 X_LOCK (pool->reqlock);
566 ++nreqs; 579 ++pool->nreqs;
567 ++nready; 580 ++pool->nready;
568 reqq_push (&pool->req_queue, req); 581 reqq_push (&pool->req_queue, req);
569 X_COND_SIGNAL (reqwait); 582 X_COND_SIGNAL (pool->reqwait);
570 X_UNLOCK (reqlock); 583 X_UNLOCK (pool->reqlock);
571 584
572 etp_maybe_start_thread (pool); 585 etp_maybe_start_thread (pool);
573 } 586 }
574} 587}
575 588
576ETP_API_DECL void ecb_cold 589ETP_API_DECL void ecb_cold
577etp_set_max_poll_time (etp_pool pool, double nseconds) 590etp_set_max_poll_time (etp_pool pool, double nseconds)
578{ 591{
579 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 592 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
580 max_poll_time = nseconds * ETP_TICKS; 593 pool->max_poll_time = nseconds * ETP_TICKS;
581 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 594 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
582} 595}
583 596
584ETP_API_DECL void ecb_cold 597ETP_API_DECL void ecb_cold
585etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) 598etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
586{ 599{
587 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 600 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
588 max_poll_reqs = maxreqs; 601 pool->max_poll_reqs = maxreqs;
589 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 602 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
590} 603}
591 604
592ETP_API_DECL void ecb_cold 605ETP_API_DECL void ecb_cold
593etp_set_max_idle (etp_pool pool, unsigned int nthreads) 606etp_set_max_idle (etp_pool pool, unsigned int nthreads)
594{ 607{
595 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 608 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
596 max_idle = nthreads; 609 pool->max_idle = nthreads;
597 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 610 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
598} 611}
599 612
600ETP_API_DECL void ecb_cold 613ETP_API_DECL void ecb_cold
601etp_set_idle_timeout (etp_pool pool, unsigned int seconds) 614etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
602{ 615{
603 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 616 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
604 idle_timeout = seconds; 617 pool->idle_timeout = seconds;
605 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 618 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
606} 619}
607 620
608ETP_API_DECL void ecb_cold 621ETP_API_DECL void ecb_cold
609etp_set_min_parallel (etp_pool pool, unsigned int nthreads) 622etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
610{ 623{
611 if (wanted < nthreads) 624 if (pool->wanted < nthreads)
612 wanted = nthreads; 625 pool->wanted = nthreads;
613} 626}
614 627
615ETP_API_DECL void ecb_cold 628ETP_API_DECL void ecb_cold
616etp_set_max_parallel (etp_pool pool, unsigned int nthreads) 629etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
617{ 630{
618 if (wanted > nthreads) 631 if (pool->wanted > nthreads)
619 wanted = nthreads; 632 pool->wanted = nthreads;
620 633
621 while (started > wanted) 634 while (pool->started > pool->wanted)
622 etp_end_thread (pool); 635 etp_end_thread (pool);
623} 636}
624 637

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines