--- libeio/etp.c 2015/06/25 18:08:47 1.6 +++ libeio/etp.c 2016/05/01 17:15:45 1.11 @@ -1,7 +1,7 @@ /* * libetp implementation * - * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann + * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -37,6 +37,15 @@ * either the BSD or the GPL. */ +#if HAVE_SYS_PRCTL_H +# include +#endif + +#ifdef EIO_STACKSIZE +# define X_STACKSIZE EIO_STACKSIZE +#endif +#include "xthread.h" + #ifndef ETP_API_DECL # define ETP_API_DECL static #endif @@ -107,6 +116,24 @@ int size; } etp_reqq; +typedef struct etp_pool *etp_pool; + +typedef struct etp_worker +{ + etp_pool pool; + + struct etp_tmpbuf tmpbuf; + + /* locked by pool->wrklock */ + struct etp_worker *prev, *next; + + xthread_t tid; + +#ifdef ETP_WORKER_COMMON + ETP_WORKER_COMMON +#endif +} etp_worker; + struct etp_pool { void *userdata; @@ -132,27 +159,9 @@ xmutex_t reslock; xmutex_t reqlock; xcond_t reqwait; -}; - -typedef struct etp_pool *etp_pool; - -typedef struct etp_worker -{ - etp_pool pool; - - struct etp_tmpbuf tmpbuf; - - /* locked by pool->wrklock */ - struct etp_worker *prev, *next; - - xthread_t tid; -#ifdef ETP_WORKER_COMMON - ETP_WORKER_COMMON -#endif -} etp_worker; - -static etp_worker wrk_first; /* NOT etp */ + etp_worker wrk_first; +}; #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) @@ -276,7 +285,7 @@ } ETP_API_DECL int ecb_cold -etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) +etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) { X_MUTEX_CREATE (pool->wrklock); X_MUTEX_CREATE (pool->reslock); @@ -286,8 +295,8 @@ reqq_init (&pool->req_queue); reqq_init (&pool->res_queue); - wrk_first.next = - wrk_first.prev = &wrk_first; + pool->wrk_first.next = + pool->wrk_first.prev = &pool->wrk_first; pool->started = 0; pool->idle = 0; @@ -299,6 +308,7 @@ pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ + pool->userdata = userdata; pool->want_poll_cb = want_poll; pool->done_poll_cb = done_poll; @@ -310,7 +320,7 @@ { #if HAVE_PRCTL_SET_NAME /* provide a more sensible "thread name" */ - char name[16 + 1]; + char name[15 + 1]; const int namelen = sizeof (name) - 1; int len; @@ -344,7 +354,7 @@ { req = reqq_shift (&pool->req_queue); - if (req) + if (ecb_expect_true (req)) break; if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ @@ -378,7 +388,7 @@ X_UNLOCK (pool->reqlock); - if (req->type == ETP_TYPE_QUIT) + if (ecb_expect_false (req->type == ETP_TYPE_QUIT)) goto quit; ETP_EXECUTE (self, req); @@ -388,7 +398,7 @@ ++pool->npending; if (!reqq_push (&pool->res_queue, req)) - ETP_WANT_POLL (poll); + ETP_WANT_POLL (pool); etp_worker_clear (self); @@ -419,10 +429,10 @@ if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) { - wrk->prev = &wrk_first; - wrk->next = wrk_first.next; - wrk_first.next->prev = wrk; - wrk_first.next = wrk; + wrk->prev = &pool->wrk_first; + wrk->next = pool->wrk_first.next; + pool->wrk_first.next->prev = wrk; + pool->wrk_first.next = wrk; ++pool->started; } else @@ -486,17 +496,17 @@ X_LOCK (pool->reslock); req = reqq_shift (&pool->res_queue); - if (req) + if (ecb_expect_true (req)) { --pool->npending; if (!pool->res_queue.size) - ETP_DONE_POLL (pool->userdata); + ETP_DONE_POLL (pool); } X_UNLOCK (pool->reslock); - if (!req) + if (ecb_expect_false (!req)) return 0; X_LOCK (pool->reqlock); @@ -587,10 +597,10 @@ } ETP_API_DECL void ecb_cold -etp_set_max_poll_time (etp_pool pool, double nseconds) +etp_set_max_poll_time (etp_pool pool, double seconds) { if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); - pool->max_poll_time = nseconds * ETP_TICKS; + pool->max_poll_time = seconds * ETP_TICKS; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } @@ -603,10 +613,10 @@ } ETP_API_DECL void ecb_cold -etp_set_max_idle (etp_pool pool, unsigned int nthreads) +etp_set_max_idle (etp_pool pool, unsigned int threads) { if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); - pool->max_idle = nthreads; + pool->max_idle = threads; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } @@ -619,17 +629,17 @@ } ETP_API_DECL void ecb_cold -etp_set_min_parallel (etp_pool pool, unsigned int nthreads) +etp_set_min_parallel (etp_pool pool, unsigned int threads) { - if (pool->wanted < nthreads) - pool->wanted = nthreads; + if (pool->wanted < threads) + pool->wanted = threads; } ETP_API_DECL void ecb_cold -etp_set_max_parallel (etp_pool pool, unsigned int nthreads) +etp_set_max_parallel (etp_pool pool, unsigned int threads) { - if (pool->wanted > nthreads) - pool->wanted = nthreads; + if (pool->wanted > threads) + pool->wanted = threads; while (pool->started > pool->wanted) etp_end_thread (pool);