ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.6 by root, Thu Jun 25 18:08:47 2015 UTC vs.
Revision 1.11 by root, Sun May 1 17:15:45 2016 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
35 * and other provisions required by the GPL. If you do not delete the 35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under 36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL. 37 * either the BSD or the GPL.
38 */ 38 */
39 39
40#if HAVE_SYS_PRCTL_H
41# include <sys/prctl.h>
42#endif
43
44#ifdef EIO_STACKSIZE
45# define X_STACKSIZE EIO_STACKSIZE
46#endif
47#include "xthread.h"
48
40#ifndef ETP_API_DECL 49#ifndef ETP_API_DECL
41# define ETP_API_DECL static 50# define ETP_API_DECL static
42#endif 51#endif
43 52
44#ifndef ETP_PRI_MIN 53#ifndef ETP_PRI_MIN
104typedef struct 113typedef struct
105{ 114{
106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 115 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
107 int size; 116 int size;
108} etp_reqq; 117} etp_reqq;
118
119typedef struct etp_pool *etp_pool;
120
121typedef struct etp_worker
122{
123 etp_pool pool;
124
125 struct etp_tmpbuf tmpbuf;
126
127 /* locked by pool->wrklock */
128 struct etp_worker *prev, *next;
129
130 xthread_t tid;
131
132#ifdef ETP_WORKER_COMMON
133 ETP_WORKER_COMMON
134#endif
135} etp_worker;
109 136
110struct etp_pool 137struct etp_pool
111{ 138{
112 void *userdata; 139 void *userdata;
113 140
130 157
131 xmutex_t wrklock; 158 xmutex_t wrklock;
132 xmutex_t reslock; 159 xmutex_t reslock;
133 xmutex_t reqlock; 160 xmutex_t reqlock;
134 xcond_t reqwait; 161 xcond_t reqwait;
162
163 etp_worker wrk_first;
135}; 164};
136
137typedef struct etp_pool *etp_pool;
138
139typedef struct etp_worker
140{
141 etp_pool pool;
142
143 struct etp_tmpbuf tmpbuf;
144
145 /* locked by pool->wrklock */
146 struct etp_worker *prev, *next;
147
148 xthread_t tid;
149
150#ifdef ETP_WORKER_COMMON
151 ETP_WORKER_COMMON
152#endif
153} etp_worker;
154
155static etp_worker wrk_first; /* NOT etp */
156 165
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) 166#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) 167#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159 168
160/* worker threads management */ 169/* worker threads management */
274 283
275 abort (); 284 abort ();
276} 285}
277 286
278ETP_API_DECL int ecb_cold 287ETP_API_DECL int ecb_cold
279etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) 288etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
280{ 289{
281 X_MUTEX_CREATE (pool->wrklock); 290 X_MUTEX_CREATE (pool->wrklock);
282 X_MUTEX_CREATE (pool->reslock); 291 X_MUTEX_CREATE (pool->reslock);
283 X_MUTEX_CREATE (pool->reqlock); 292 X_MUTEX_CREATE (pool->reqlock);
284 X_COND_CREATE (pool->reqwait); 293 X_COND_CREATE (pool->reqwait);
285 294
286 reqq_init (&pool->req_queue); 295 reqq_init (&pool->req_queue);
287 reqq_init (&pool->res_queue); 296 reqq_init (&pool->res_queue);
288 297
289 wrk_first.next = 298 pool->wrk_first.next =
290 wrk_first.prev = &wrk_first; 299 pool->wrk_first.prev = &pool->wrk_first;
291 300
292 pool->started = 0; 301 pool->started = 0;
293 pool->idle = 0; 302 pool->idle = 0;
294 pool->nreqs = 0; 303 pool->nreqs = 0;
295 pool->nready = 0; 304 pool->nready = 0;
297 pool->wanted = 4; 306 pool->wanted = 4;
298 307
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ 308 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ 309 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301 310
311 pool->userdata = userdata;
302 pool->want_poll_cb = want_poll; 312 pool->want_poll_cb = want_poll;
303 pool->done_poll_cb = done_poll; 313 pool->done_poll_cb = done_poll;
304 314
305 return 0; 315 return 0;
306} 316}
308static void ecb_noinline ecb_cold 318static void ecb_noinline ecb_cold
309etp_proc_init (void) 319etp_proc_init (void)
310{ 320{
311#if HAVE_PRCTL_SET_NAME 321#if HAVE_PRCTL_SET_NAME
312 /* provide a more sensible "thread name" */ 322 /* provide a more sensible "thread name" */
313 char name[16 + 1]; 323 char name[15 + 1];
314 const int namelen = sizeof (name) - 1; 324 const int namelen = sizeof (name) - 1;
315 int len; 325 int len;
316 326
317 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); 327 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
318 name [namelen] = 0; 328 name [namelen] = 0;
342 352
343 for (;;) 353 for (;;)
344 { 354 {
345 req = reqq_shift (&pool->req_queue); 355 req = reqq_shift (&pool->req_queue);
346 356
347 if (req) 357 if (ecb_expect_true (req))
348 break; 358 break;
349 359
350 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 360 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
351 { 361 {
352 X_UNLOCK (pool->reqlock); 362 X_UNLOCK (pool->reqlock);
376 386
377 --pool->nready; 387 --pool->nready;
378 388
379 X_UNLOCK (pool->reqlock); 389 X_UNLOCK (pool->reqlock);
380 390
381 if (req->type == ETP_TYPE_QUIT) 391 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
382 goto quit; 392 goto quit;
383 393
384 ETP_EXECUTE (self, req); 394 ETP_EXECUTE (self, req);
385 395
386 X_LOCK (pool->reslock); 396 X_LOCK (pool->reslock);
387 397
388 ++pool->npending; 398 ++pool->npending;
389 399
390 if (!reqq_push (&pool->res_queue, req)) 400 if (!reqq_push (&pool->res_queue, req))
391 ETP_WANT_POLL (poll); 401 ETP_WANT_POLL (pool);
392 402
393 etp_worker_clear (self); 403 etp_worker_clear (self);
394 404
395 X_UNLOCK (pool->reslock); 405 X_UNLOCK (pool->reslock);
396 } 406 }
417 427
418 X_LOCK (pool->wrklock); 428 X_LOCK (pool->wrklock);
419 429
420 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 430 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
421 { 431 {
422 wrk->prev = &wrk_first; 432 wrk->prev = &pool->wrk_first;
423 wrk->next = wrk_first.next; 433 wrk->next = pool->wrk_first.next;
424 wrk_first.next->prev = wrk; 434 pool->wrk_first.next->prev = wrk;
425 wrk_first.next = wrk; 435 pool->wrk_first.next = wrk;
426 ++pool->started; 436 ++pool->started;
427 } 437 }
428 else 438 else
429 free (wrk); 439 free (wrk);
430 440
484 etp_maybe_start_thread (pool); 494 etp_maybe_start_thread (pool);
485 495
486 X_LOCK (pool->reslock); 496 X_LOCK (pool->reslock);
487 req = reqq_shift (&pool->res_queue); 497 req = reqq_shift (&pool->res_queue);
488 498
489 if (req) 499 if (ecb_expect_true (req))
490 { 500 {
491 --pool->npending; 501 --pool->npending;
492 502
493 if (!pool->res_queue.size) 503 if (!pool->res_queue.size)
494 ETP_DONE_POLL (pool->userdata); 504 ETP_DONE_POLL (pool);
495 } 505 }
496 506
497 X_UNLOCK (pool->reslock); 507 X_UNLOCK (pool->reslock);
498 508
499 if (!req) 509 if (ecb_expect_false (!req))
500 return 0; 510 return 0;
501 511
502 X_LOCK (pool->reqlock); 512 X_LOCK (pool->reqlock);
503 --pool->nreqs; 513 --pool->nreqs;
504 X_UNLOCK (pool->reqlock); 514 X_UNLOCK (pool->reqlock);
585 etp_maybe_start_thread (pool); 595 etp_maybe_start_thread (pool);
586 } 596 }
587} 597}
588 598
589ETP_API_DECL void ecb_cold 599ETP_API_DECL void ecb_cold
590etp_set_max_poll_time (etp_pool pool, double nseconds) 600etp_set_max_poll_time (etp_pool pool, double seconds)
591{ 601{
592 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); 602 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
593 pool->max_poll_time = nseconds * ETP_TICKS; 603 pool->max_poll_time = seconds * ETP_TICKS;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 604 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
595} 605}
596 606
597ETP_API_DECL void ecb_cold 607ETP_API_DECL void ecb_cold
598etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) 608etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
601 pool->max_poll_reqs = maxreqs; 611 pool->max_poll_reqs = maxreqs;
602 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); 612 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
603} 613}
604 614
605ETP_API_DECL void ecb_cold 615ETP_API_DECL void ecb_cold
606etp_set_max_idle (etp_pool pool, unsigned int nthreads) 616etp_set_max_idle (etp_pool pool, unsigned int threads)
607{ 617{
608 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); 618 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
609 pool->max_idle = nthreads; 619 pool->max_idle = threads;
610 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 620 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
611} 621}
612 622
613ETP_API_DECL void ecb_cold 623ETP_API_DECL void ecb_cold
614etp_set_idle_timeout (etp_pool pool, unsigned int seconds) 624etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
617 pool->idle_timeout = seconds; 627 pool->idle_timeout = seconds;
618 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); 628 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
619} 629}
620 630
621ETP_API_DECL void ecb_cold 631ETP_API_DECL void ecb_cold
622etp_set_min_parallel (etp_pool pool, unsigned int nthreads) 632etp_set_min_parallel (etp_pool pool, unsigned int threads)
623{ 633{
624 if (pool->wanted < nthreads) 634 if (pool->wanted < threads)
625 pool->wanted = nthreads; 635 pool->wanted = threads;
626} 636}
627 637
628ETP_API_DECL void ecb_cold 638ETP_API_DECL void ecb_cold
629etp_set_max_parallel (etp_pool pool, unsigned int nthreads) 639etp_set_max_parallel (etp_pool pool, unsigned int threads)
630{ 640{
631 if (pool->wanted > nthreads) 641 if (pool->wanted > threads)
632 pool->wanted = nthreads; 642 pool->wanted = threads;
633 643
634 while (pool->started > pool->wanted) 644 while (pool->started > pool->wanted)
635 etp_end_thread (pool); 645 etp_end_thread (pool);
636} 646}
637 647

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines