ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.6 by root, Thu Jun 25 18:08:47 2015 UTC vs.
Revision 1.13 by root, Tue Aug 14 11:44:53 2018 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
35 * and other provisions required by the GPL. If you do not delete the 35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under 36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL. 37 * either the BSD or the GPL.
38 */ 38 */
39 39
40#if HAVE_SYS_PRCTL_H
41# include <sys/prctl.h>
42#endif
43
40#ifndef ETP_API_DECL 44#ifndef ETP_API_DECL
41# define ETP_API_DECL static 45# define ETP_API_DECL static
42#endif 46#endif
43 47
44#ifndef ETP_PRI_MIN 48#ifndef ETP_PRI_MIN
104typedef struct 108typedef struct
105{ 109{
106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 110 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107 int size; 111 int size;
108} etp_reqq; 112} etp_reqq;
113
114typedef struct etp_pool *etp_pool;
115
116typedef struct etp_worker
117{
118 etp_pool pool;
119
120 struct etp_tmpbuf tmpbuf;
121
122 /* locked by pool->wrklock */
123 struct etp_worker *prev, *next;
124
125 xthread_t tid;
126
127#ifdef ETP_WORKER_COMMON
128 ETP_WORKER_COMMON
129#endif
130} etp_worker;
109 131
110struct etp_pool 132struct etp_pool
111{ 133{
112 void *userdata; 134 void *userdata;
113 135
130 152
131 xmutex_t wrklock; 153 xmutex_t wrklock;
132 xmutex_t reslock; 154 xmutex_t reslock;
133 xmutex_t reqlock; 155 xmutex_t reqlock;
134 xcond_t reqwait; 156 xcond_t reqwait;
157
158 etp_worker wrk_first;
135}; 159};
136
137typedef struct etp_pool *etp_pool;
138
139typedef struct etp_worker
140{
141 etp_pool pool;
142
143 struct etp_tmpbuf tmpbuf;
144
145 /* locked by pool->wrklock */
146 struct etp_worker *prev, *next;
147
148 xthread_t tid;
149
150#ifdef ETP_WORKER_COMMON
151 ETP_WORKER_COMMON
152#endif
153} etp_worker;
154
155static etp_worker wrk_first; /* NOT etp */
156 160
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) 161#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) 162#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 163
160/* worker threads management */ 164/* worker threads management */
274 278
275 abort (); 279 abort ();
276} 280}
277 281
278ETP_API_DECL int ecb_cold 282ETP_API_DECL int ecb_cold
279etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) 283etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
280{ 284{
281 X_MUTEX_CREATE (pool->wrklock); 285 X_MUTEX_CREATE (pool->wrklock);
282 X_MUTEX_CREATE (pool->reslock); 286 X_MUTEX_CREATE (pool->reslock);
283 X_MUTEX_CREATE (pool->reqlock); 287 X_MUTEX_CREATE (pool->reqlock);
284 X_COND_CREATE (pool->reqwait); 288 X_COND_CREATE (pool->reqwait);
285 289
286 reqq_init (&pool->req_queue); 290 reqq_init (&pool->req_queue);
287 reqq_init (&pool->res_queue); 291 reqq_init (&pool->res_queue);
288 292
289 wrk_first.next = 293 pool->wrk_first.next =
290 wrk_first.prev = &wrk_first; 294 pool->wrk_first.prev = &pool->wrk_first;
291 295
292 pool->started = 0; 296 pool->started = 0;
293 pool->idle = 0; 297 pool->idle = 0;
294 pool->nreqs = 0; 298 pool->nreqs = 0;
295 pool->nready = 0; 299 pool->nready = 0;
297 pool->wanted = 4; 301 pool->wanted = 4;
298 302
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ 303 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ 304 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301 305
306 pool->userdata = userdata;
302 pool->want_poll_cb = want_poll; 307 pool->want_poll_cb = want_poll;
303 pool->done_poll_cb = done_poll; 308 pool->done_poll_cb = done_poll;
304 309
305 return 0; 310 return 0;
306} 311}
342 347
343 for (;;) 348 for (;;)
344 { 349 {
345 req = reqq_shift (&pool->req_queue); 350 req = reqq_shift (&pool->req_queue);
346 351
347 if (req) 352 if (ecb_expect_true (req))
348 break; 353 break;
349 354
350 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 355 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
351 { 356 {
352 X_UNLOCK (pool->reqlock); 357 X_UNLOCK (pool->reqlock);
376 381
377 --pool->nready; 382 --pool->nready;
378 383
379 X_UNLOCK (pool->reqlock); 384 X_UNLOCK (pool->reqlock);
380 385
381 if (req->type == ETP_TYPE_QUIT) 386 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
382 goto quit; 387 goto quit;
383 388
384 ETP_EXECUTE (self, req); 389 ETP_EXECUTE (self, req);
385 390
386 X_LOCK (pool->reslock); 391 X_LOCK (pool->reslock);
387 392
388 ++pool->npending; 393 ++pool->npending;
389 394
390 if (!reqq_push (&pool->res_queue, req)) 395 if (!reqq_push (&pool->res_queue, req))
391 ETP_WANT_POLL (poll); 396 ETP_WANT_POLL (pool);
392 397
393 etp_worker_clear (self); 398 etp_worker_clear (self);
394 399
395 X_UNLOCK (pool->reslock); 400 X_UNLOCK (pool->reslock);
396 } 401 }
417 422
418 X_LOCK (pool->wrklock); 423 X_LOCK (pool->wrklock);
419 424
420 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 425 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
421 { 426 {
422 wrk->prev = &wrk_first; 427 wrk->prev = &pool->wrk_first;
423 wrk->next = wrk_first.next; 428 wrk->next = pool->wrk_first.next;
424 wrk_first.next->prev = wrk; 429 pool->wrk_first.next->prev = wrk;
425 wrk_first.next = wrk; 430 pool->wrk_first.next = wrk;
426 ++pool->started; 431 ++pool->started;
427 } 432 }
428 else 433 else
429 free (wrk); 434 free (wrk);
430 435
484 etp_maybe_start_thread (pool); 489 etp_maybe_start_thread (pool);
485 490
486 X_LOCK (pool->reslock); 491 X_LOCK (pool->reslock);
487 req = reqq_shift (&pool->res_queue); 492 req = reqq_shift (&pool->res_queue);
488 493
489 if (req) 494 if (ecb_expect_true (req))
490 { 495 {
491 --pool->npending; 496 --pool->npending;
492 497
493 if (!pool->res_queue.size) 498 if (!pool->res_queue.size)
494 ETP_DONE_POLL (pool->userdata); 499 ETP_DONE_POLL (pool);
495 } 500 }
496 501
497 X_UNLOCK (pool->reslock); 502 X_UNLOCK (pool->reslock);
498 503
499 if (!req) 504 if (ecb_expect_false (!req))
500 return 0; 505 return 0;
501 506
502 X_LOCK (pool->reqlock); 507 X_LOCK (pool->reqlock);
503 --pool->nreqs; 508 --pool->nreqs;
504 X_UNLOCK (pool->reqlock); 509 X_UNLOCK (pool->reqlock);
585 etp_maybe_start_thread (pool); 590 etp_maybe_start_thread (pool);
586 } 591 }
587} 592}
588 593
589ETP_API_DECL void ecb_cold 594ETP_API_DECL void ecb_cold
590etp_set_max_poll_time (etp_pool pool, double nseconds) 595etp_set_max_poll_time (etp_pool pool, double seconds)
591{ 596{
592 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); 597 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
593 pool->max_poll_time = nseconds * ETP_TICKS; 598 pool->max_poll_time = seconds * ETP_TICKS;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 599 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
595} 600}
596 601
597ETP_API_DECL void ecb_cold 602ETP_API_DECL void ecb_cold
598etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) 603etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
601 pool->max_poll_reqs = maxreqs; 606 pool->max_poll_reqs = maxreqs;
602 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 607 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
603} 608}
604 609
605ETP_API_DECL void ecb_cold 610ETP_API_DECL void ecb_cold
606etp_set_max_idle (etp_pool pool, unsigned int nthreads) 611etp_set_max_idle (etp_pool pool, unsigned int threads)
607{ 612{
608 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); 613 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
609 pool->max_idle = nthreads; 614 pool->max_idle = threads;
610 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 615 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
611} 616}
612 617
613ETP_API_DECL void ecb_cold 618ETP_API_DECL void ecb_cold
614etp_set_idle_timeout (etp_pool pool, unsigned int seconds) 619etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
617 pool->idle_timeout = seconds; 622 pool->idle_timeout = seconds;
618 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 623 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
619} 624}
620 625
621ETP_API_DECL void ecb_cold 626ETP_API_DECL void ecb_cold
622etp_set_min_parallel (etp_pool pool, unsigned int nthreads) 627etp_set_min_parallel (etp_pool pool, unsigned int threads)
623{ 628{
624 if (pool->wanted < nthreads) 629 if (pool->wanted < threads)
625 pool->wanted = nthreads; 630 pool->wanted = threads;
626} 631}
627 632
628ETP_API_DECL void ecb_cold 633ETP_API_DECL void ecb_cold
629etp_set_max_parallel (etp_pool pool, unsigned int nthreads) 634etp_set_max_parallel (etp_pool pool, unsigned int threads)
630{ 635{
631 if (pool->wanted > nthreads) 636 if (pool->wanted > threads)
632 pool->wanted = nthreads; 637 pool->wanted = threads;
633 638
634 while (pool->started > pool->wanted) 639 while (pool->started > pool->wanted)
635 etp_end_thread (pool); 640 etp_end_thread (pool);
636} 641}
637 642

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines