ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.1 by root, Sun Apr 14 09:43:19 2013 UTC vs.
Revision 1.13 by root, Tue Aug 14 11:44:53 2018 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
35 * and other provisions required by the GPL. If you do not delete the 35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under 36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL. 37 * either the BSD or the GPL.
38 */ 38 */
39 39
40#if HAVE_SYS_PRCTL_H
41# include <sys/prctl.h>
42#endif
43
40#ifndef ETP_API_DECL 44#ifndef ETP_API_DECL
41# define ETP_API_DECL static 45# define ETP_API_DECL static
42#endif 46#endif
43 47
44#ifndef ETP_PRI_MIN 48#ifndef ETP_PRI_MIN
52 56
53#ifndef ETP_TYPE_GROUP 57#ifndef ETP_TYPE_GROUP
54# define ETP_TYPE_GROUP 1 58# define ETP_TYPE_GROUP 1
55#endif 59#endif
56 60
61#ifndef ETP_WANT_POLL
62# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
63#endif
64#ifndef ETP_DONE_POLL
65# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
66#endif
67
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 68#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 69
59#define ETP_TICKS ((1000000 + 1023) >> 10) 70#define ETP_TICKS ((1000000 + 1023) >> 10)
71
72enum {
73 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
74 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
75};
60 76
61/* calculate time difference in ~1/ETP_TICKS of a second */ 77/* calculate time difference in ~1/ETP_TICKS of a second */
62ecb_inline int 78ecb_inline int
63etp_tvdiff (struct timeval *tv1, struct timeval *tv2) 79etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64{ 80{
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 81 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
66 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 82 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
67} 83}
68 84
69static unsigned int started, idle, wanted = 4; 85struct etp_tmpbuf
70
71static void (*want_poll_cb) (void);
72static void (*done_poll_cb) (void);
73
74static unsigned int max_poll_time; /* reslock */
75static unsigned int max_poll_reqs; /* reslock */
76
77static unsigned int nreqs; /* reqlock */
78static unsigned int nready; /* reqlock */
79static unsigned int npending; /* reqlock */
80static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
81static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
82
83static xmutex_t wrklock;
84static xmutex_t reslock;
85static xmutex_t reqlock;
86static xcond_t reqwait;
87
88typedef struct etp_worker
89{ 86{
90 struct tmpbuf tmpbuf; 87 void *ptr;
88 int len;
89};
91 90
92 /* locked by wrklock */
93 struct etp_worker *prev, *next;
94
95 xthread_t tid;
96
97#ifdef ETP_WORKER_COMMON
98 ETP_WORKER_COMMON
99#endif
100} etp_worker;
101
102static etp_worker wrk_first; /* NOT etp */
103
104#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
105#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
106
107/* worker threads management */
108
109static void 91static void *
110etp_worker_clear (etp_worker *wrk) 92etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
111{ 93{
112} 94 if (buf->len < len)
95 {
96 free (buf->ptr);
97 buf->ptr = malloc (buf->len = len);
98 }
113 99
114static void ecb_cold 100 return buf->ptr;
115etp_worker_free (etp_worker *wrk)
116{
117 free (wrk->tmpbuf.ptr);
118
119 wrk->next->prev = wrk->prev;
120 wrk->prev->next = wrk->next;
121
122 free (wrk);
123}
124
125ETP_API_DECL unsigned int
126etp_nreqs (void)
127{
128 int retval;
129 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
130 retval = nreqs;
131 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
132 return retval;
133}
134
135ETP_API_DECL unsigned int
136etp_nready (void)
137{
138 unsigned int retval;
139
140 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
141 retval = nready;
142 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
143
144 return retval;
145}
146
147ETP_API_DECL unsigned int
148etp_npending (void)
149{
150 unsigned int retval;
151
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = npending;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155
156 return retval;
157}
158
159ETP_API_DECL unsigned int
160etp_nthreads (void)
161{
162 unsigned int retval;
163
164 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
165 retval = started;
166 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
167
168 return retval;
169} 101}
170 102
171/* 103/*
172 * a somewhat faster data structure might be nice, but 104 * a somewhat faster data structure might be nice, but
173 * with 8 priorities this actually needs <20 insns 105 * with 8 priorities this actually needs <20 insns
174 * per shift, the most expensive operation. 106 * per shift, the most expensive operation.
175 */ 107 */
176typedef struct { 108typedef struct
109{
177 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 110 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
178 int size; 111 int size;
179} etp_reqq; 112} etp_reqq;
180 113
114typedef struct etp_pool *etp_pool;
115
116typedef struct etp_worker
117{
118 etp_pool pool;
119
120 struct etp_tmpbuf tmpbuf;
121
122 /* locked by pool->wrklock */
123 struct etp_worker *prev, *next;
124
125 xthread_t tid;
126
127#ifdef ETP_WORKER_COMMON
128 ETP_WORKER_COMMON
129#endif
130} etp_worker;
131
132struct etp_pool
133{
134 void *userdata;
135
181static etp_reqq req_queue; 136 etp_reqq req_queue;
182static etp_reqq res_queue; 137 etp_reqq res_queue;
138
139 unsigned int started, idle, wanted;
140
141 unsigned int max_poll_time; /* pool->reslock */
142 unsigned int max_poll_reqs; /* pool->reslock */
143
144 unsigned int nreqs; /* pool->reqlock */
145 unsigned int nready; /* pool->reqlock */
146 unsigned int npending; /* pool->reqlock */
147 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
148 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
149
150 void (*want_poll_cb) (void *userdata);
151 void (*done_poll_cb) (void *userdata);
152
153 xmutex_t wrklock;
154 xmutex_t reslock;
155 xmutex_t reqlock;
156 xcond_t reqwait;
157
158 etp_worker wrk_first;
159};
160
161#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
162#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
163
164/* worker threads management */
165
166static void
167etp_worker_clear (etp_worker *wrk)
168{
169}
170
171static void ecb_cold
172etp_worker_free (etp_worker *wrk)
173{
174 free (wrk->tmpbuf.ptr);
175
176 wrk->next->prev = wrk->prev;
177 wrk->prev->next = wrk->next;
178
179 free (wrk);
180}
181
182ETP_API_DECL unsigned int
183etp_nreqs (etp_pool pool)
184{
185 int retval;
186 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
187 retval = pool->nreqs;
188 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
189 return retval;
190}
191
192ETP_API_DECL unsigned int
193etp_nready (etp_pool pool)
194{
195 unsigned int retval;
196
197 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
198 retval = pool->nready;
199 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
200
201 return retval;
202}
203
204ETP_API_DECL unsigned int
205etp_npending (etp_pool pool)
206{
207 unsigned int retval;
208
209 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
210 retval = pool->npending;
211 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
212
213 return retval;
214}
215
216ETP_API_DECL unsigned int
217etp_nthreads (etp_pool pool)
218{
219 unsigned int retval;
220
221 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
222 retval = pool->started;
223 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
224
225 return retval;
226}
183 227
184static void ecb_noinline ecb_cold 228static void ecb_noinline ecb_cold
185reqq_init (etp_reqq *q) 229reqq_init (etp_reqq *q)
186{ 230{
187 int pri; 231 int pri;
219 263
220 --q->size; 264 --q->size;
221 265
222 for (pri = ETP_NUM_PRI; pri--; ) 266 for (pri = ETP_NUM_PRI; pri--; )
223 { 267 {
224 eio_req *req = q->qs[pri]; 268 ETP_REQ *req = q->qs[pri];
225 269
226 if (req) 270 if (req)
227 { 271 {
228 if (!(q->qs[pri] = (eio_req *)req->next)) 272 if (!(q->qs[pri] = (ETP_REQ *)req->next))
229 q->qe[pri] = 0; 273 q->qe[pri] = 0;
230 274
231 return req; 275 return req;
232 } 276 }
233 } 277 }
234 278
235 abort (); 279 abort ();
236} 280}
237 281
238ETP_API_DECL int ecb_cold 282ETP_API_DECL int ecb_cold
239etp_init (void (*want_poll)(void), void (*done_poll)(void)) 283etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
240{ 284{
241 X_MUTEX_CREATE (wrklock); 285 X_MUTEX_CREATE (pool->wrklock);
242 X_MUTEX_CREATE (reslock); 286 X_MUTEX_CREATE (pool->reslock);
243 X_MUTEX_CREATE (reqlock); 287 X_MUTEX_CREATE (pool->reqlock);
244 X_COND_CREATE (reqwait); 288 X_COND_CREATE (pool->reqwait);
245 289
246 reqq_init (&req_queue); 290 reqq_init (&pool->req_queue);
247 reqq_init (&res_queue); 291 reqq_init (&pool->res_queue);
248 292
249 wrk_first.next = 293 pool->wrk_first.next =
250 wrk_first.prev = &wrk_first; 294 pool->wrk_first.prev = &pool->wrk_first;
251 295
252 started = 0; 296 pool->started = 0;
253 idle = 0; 297 pool->idle = 0;
254 nreqs = 0; 298 pool->nreqs = 0;
255 nready = 0; 299 pool->nready = 0;
256 npending = 0; 300 pool->npending = 0;
301 pool->wanted = 4;
257 302
303 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
304 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
305
306 pool->userdata = userdata;
258 want_poll_cb = want_poll; 307 pool->want_poll_cb = want_poll;
259 done_poll_cb = done_poll; 308 pool->done_poll_cb = done_poll;
260 309
261 return 0; 310 return 0;
262} 311}
263 312
264/* not yet in etp.c */ 313static void ecb_noinline ecb_cold
314etp_proc_init (void)
315{
316#if HAVE_PRCTL_SET_NAME
317 /* provide a more sensible "thread name" */
318 char name[16 + 1];
319 const int namelen = sizeof (name) - 1;
320 int len;
321
322 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
323 name [namelen] = 0;
324 len = strlen (name);
325 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
326 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
327#endif
328}
329
265X_THREAD_PROC (etp_proc); 330X_THREAD_PROC (etp_proc)
331{
332 ETP_REQ *req;
333 struct timespec ts;
334 etp_worker *self = (etp_worker *)thr_arg;
335 etp_pool pool = self->pool;
336
337 etp_proc_init ();
338
339 /* try to distribute timeouts somewhat evenly */
340 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
341
342 for (;;)
343 {
344 ts.tv_sec = 0;
345
346 X_LOCK (pool->reqlock);
347
348 for (;;)
349 {
350 req = reqq_shift (&pool->req_queue);
351
352 if (ecb_expect_true (req))
353 break;
354
355 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
356 {
357 X_UNLOCK (pool->reqlock);
358 X_LOCK (pool->wrklock);
359 --pool->started;
360 X_UNLOCK (pool->wrklock);
361 goto quit;
362 }
363
364 ++pool->idle;
365
366 if (pool->idle <= pool->max_idle)
367 /* we are allowed to pool->idle, so do so without any timeout */
368 X_COND_WAIT (pool->reqwait, pool->reqlock);
369 else
370 {
371 /* initialise timeout once */
372 if (!ts.tv_sec)
373 ts.tv_sec = time (0) + pool->idle_timeout;
374
375 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
376 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
377 }
378
379 --pool->idle;
380 }
381
382 --pool->nready;
383
384 X_UNLOCK (pool->reqlock);
385
386 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
387 goto quit;
388
389 ETP_EXECUTE (self, req);
390
391 X_LOCK (pool->reslock);
392
393 ++pool->npending;
394
395 if (!reqq_push (&pool->res_queue, req))
396 ETP_WANT_POLL (pool);
397
398 etp_worker_clear (self);
399
400 X_UNLOCK (pool->reslock);
401 }
402
403quit:
404 free (req);
405
406 X_LOCK (pool->wrklock);
407 etp_worker_free (self);
408 X_UNLOCK (pool->wrklock);
409
410 return 0;
411}
266 412
267static void ecb_cold 413static void ecb_cold
268etp_start_thread (void) 414etp_start_thread (etp_pool pool)
269{ 415{
270 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 416 etp_worker *wrk = calloc (1, sizeof (etp_worker));
271 417
272 /*TODO*/ 418 /*TODO*/
273 assert (("unable to allocate worker thread data", wrk)); 419 assert (("unable to allocate worker thread data", wrk));
274 420
421 wrk->pool = pool;
422
275 X_LOCK (wrklock); 423 X_LOCK (pool->wrklock);
276 424
277 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 425 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
278 { 426 {
279 wrk->prev = &wrk_first; 427 wrk->prev = &pool->wrk_first;
280 wrk->next = wrk_first.next; 428 wrk->next = pool->wrk_first.next;
281 wrk_first.next->prev = wrk; 429 pool->wrk_first.next->prev = wrk;
282 wrk_first.next = wrk; 430 pool->wrk_first.next = wrk;
283 ++started; 431 ++pool->started;
284 } 432 }
285 else 433 else
286 free (wrk); 434 free (wrk);
287 435
288 X_UNLOCK (wrklock); 436 X_UNLOCK (pool->wrklock);
289} 437}
290 438
291static void 439static void
292etp_maybe_start_thread (void) 440etp_maybe_start_thread (etp_pool pool)
293{ 441{
294 if (ecb_expect_true (etp_nthreads () >= wanted)) 442 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
295 return; 443 return;
296 444
297 /* todo: maybe use idle here, but might be less exact */ 445 /* todo: maybe use pool->idle here, but might be less exact */
298 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 446 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
299 return; 447 return;
300 448
301 etp_start_thread (); 449 etp_start_thread (pool);
302} 450}
303 451
304static void ecb_cold 452static void ecb_cold
305etp_end_thread (void) 453etp_end_thread (etp_pool pool)
306{ 454{
307 eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */ 455 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
308 456
309 req->type = ETP_TYPE_QUIT; 457 req->type = ETP_TYPE_QUIT;
310 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 458 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
311 459
312 X_LOCK (reqlock); 460 X_LOCK (pool->reqlock);
313 reqq_push (&req_queue, req); 461 reqq_push (&pool->req_queue, req);
314 X_COND_SIGNAL (reqwait); 462 X_COND_SIGNAL (pool->reqwait);
315 X_UNLOCK (reqlock); 463 X_UNLOCK (pool->reqlock);
316 464
317 X_LOCK (wrklock); 465 X_LOCK (pool->wrklock);
318 --started; 466 --pool->started;
319 X_UNLOCK (wrklock); 467 X_UNLOCK (pool->wrklock);
320} 468}
321 469
322ETP_API_DECL int 470ETP_API_DECL int
323etp_poll (void) 471etp_poll (etp_pool pool)
324{ 472{
325 unsigned int maxreqs; 473 unsigned int maxreqs;
326 unsigned int maxtime; 474 unsigned int maxtime;
327 struct timeval tv_start, tv_now; 475 struct timeval tv_start, tv_now;
328 476
329 X_LOCK (reslock); 477 X_LOCK (pool->reslock);
330 maxreqs = max_poll_reqs; 478 maxreqs = pool->max_poll_reqs;
331 maxtime = max_poll_time; 479 maxtime = pool->max_poll_time;
332 X_UNLOCK (reslock); 480 X_UNLOCK (pool->reslock);
333 481
334 if (maxtime) 482 if (maxtime)
335 gettimeofday (&tv_start, 0); 483 gettimeofday (&tv_start, 0);
336 484
337 for (;;) 485 for (;;)
338 { 486 {
339 ETP_REQ *req; 487 ETP_REQ *req;
340 488
341 etp_maybe_start_thread (); 489 etp_maybe_start_thread (pool);
342 490
343 X_LOCK (reslock); 491 X_LOCK (pool->reslock);
344 req = reqq_shift (&res_queue); 492 req = reqq_shift (&pool->res_queue);
345 493
346 if (req) 494 if (ecb_expect_true (req))
347 { 495 {
348 --npending; 496 --pool->npending;
349 497
350 if (!res_queue.size && done_poll_cb) 498 if (!pool->res_queue.size)
351 done_poll_cb (); 499 ETP_DONE_POLL (pool);
352 } 500 }
353 501
354 X_UNLOCK (reslock); 502 X_UNLOCK (pool->reslock);
355 503
356 if (!req) 504 if (ecb_expect_false (!req))
357 return 0; 505 return 0;
358 506
359 X_LOCK (reqlock); 507 X_LOCK (pool->reqlock);
360 --nreqs; 508 --pool->nreqs;
361 X_UNLOCK (reqlock); 509 X_UNLOCK (pool->reqlock);
362 510
363 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 511 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
364 { 512 {
365 req->int1 = 1; /* mark request as delayed */ 513 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
366 continue; 514 continue;
367 } 515 }
368 else 516 else
369 { 517 {
370 int res = ETP_FINISH (req); 518 int res = ETP_FINISH (req);
387 errno = EAGAIN; 535 errno = EAGAIN;
388 return -1; 536 return -1;
389} 537}
390 538
391ETP_API_DECL void 539ETP_API_DECL void
392etp_grp_cancel (ETP_REQ *grp); 540etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
393 541
394ETP_API_DECL void 542ETP_API_DECL void
395etp_cancel (ETP_REQ *req) 543etp_cancel (etp_pool pool, ETP_REQ *req)
396{ 544{
397 req->cancelled = 1; 545 req->cancelled = 1;
398 546
399 etp_grp_cancel (req); 547 etp_grp_cancel (pool, req);
400} 548}
401 549
402ETP_API_DECL void 550ETP_API_DECL void
403etp_grp_cancel (ETP_REQ *grp) 551etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
404{ 552{
405 for (grp = grp->grp_first; grp; grp = grp->grp_next) 553 for (grp = grp->grp_first; grp; grp = grp->grp_next)
406 etp_cancel (grp); 554 etp_cancel (pool, grp);
407} 555}
408 556
409ETP_API_DECL void 557ETP_API_DECL void
410etp_submit (ETP_REQ *req) 558etp_submit (etp_pool pool, ETP_REQ *req)
411{ 559{
412 req->pri -= ETP_PRI_MIN; 560 req->pri -= ETP_PRI_MIN;
413 561
414 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 562 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
415 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 563 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
416 564
417 if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) 565 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
418 { 566 {
419 /* I hope this is worth it :/ */ 567 /* I hope this is worth it :/ */
420 X_LOCK (reqlock); 568 X_LOCK (pool->reqlock);
421 ++nreqs; 569 ++pool->nreqs;
422 X_UNLOCK (reqlock); 570 X_UNLOCK (pool->reqlock);
423 571
424 X_LOCK (reslock); 572 X_LOCK (pool->reslock);
425 573
426 ++npending; 574 ++pool->npending;
427 575
428 if (!reqq_push (&res_queue, req) && want_poll_cb) 576 if (!reqq_push (&pool->res_queue, req))
429 want_poll_cb (); 577 ETP_WANT_POLL (pool);
430 578
431 X_UNLOCK (reslock); 579 X_UNLOCK (pool->reslock);
432 } 580 }
433 else 581 else
434 { 582 {
435 X_LOCK (reqlock); 583 X_LOCK (pool->reqlock);
436 ++nreqs; 584 ++pool->nreqs;
437 ++nready; 585 ++pool->nready;
438 reqq_push (&req_queue, req); 586 reqq_push (&pool->req_queue, req);
439 X_COND_SIGNAL (reqwait); 587 X_COND_SIGNAL (pool->reqwait);
440 X_UNLOCK (reqlock); 588 X_UNLOCK (pool->reqlock);
441 589
442 etp_maybe_start_thread (); 590 etp_maybe_start_thread (pool);
443 } 591 }
444} 592}
445 593
446ETP_API_DECL void ecb_cold 594ETP_API_DECL void ecb_cold
447etp_set_max_poll_time (double nseconds) 595etp_set_max_poll_time (etp_pool pool, double seconds)
448{ 596{
449 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 597 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
450 max_poll_time = nseconds * ETP_TICKS; 598 pool->max_poll_time = seconds * ETP_TICKS;
451 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 599 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
452} 600}
453 601
454ETP_API_DECL void ecb_cold 602ETP_API_DECL void ecb_cold
455etp_set_max_poll_reqs (unsigned int maxreqs) 603etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
456{ 604{
457 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 605 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
458 max_poll_reqs = maxreqs; 606 pool->max_poll_reqs = maxreqs;
459 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 607 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
460} 608}
461 609
462ETP_API_DECL void ecb_cold 610ETP_API_DECL void ecb_cold
463etp_set_max_idle (unsigned int nthreads) 611etp_set_max_idle (etp_pool pool, unsigned int threads)
464{ 612{
465 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 613 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
466 max_idle = nthreads; 614 pool->max_idle = threads;
467 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 615 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
468} 616}
469 617
470ETP_API_DECL void ecb_cold 618ETP_API_DECL void ecb_cold
471etp_set_idle_timeout (unsigned int seconds) 619etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
472{ 620{
473 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 621 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
474 idle_timeout = seconds; 622 pool->idle_timeout = seconds;
475 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 623 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
476} 624}
477 625
478ETP_API_DECL void ecb_cold 626ETP_API_DECL void ecb_cold
479etp_set_min_parallel (unsigned int nthreads) 627etp_set_min_parallel (etp_pool pool, unsigned int threads)
480{ 628{
481 if (wanted < nthreads) 629 if (pool->wanted < threads)
482 wanted = nthreads; 630 pool->wanted = threads;
483} 631}
484 632
485ETP_API_DECL void ecb_cold 633ETP_API_DECL void ecb_cold
486etp_set_max_parallel (unsigned int nthreads) 634etp_set_max_parallel (etp_pool pool, unsigned int threads)
487{ 635{
488 if (wanted > nthreads) 636 if (pool->wanted > threads)
489 wanted = nthreads; 637 pool->wanted = threads;
490 638
491 while (started > wanted) 639 while (pool->started > pool->wanted)
492 etp_end_thread (); 640 etp_end_thread (pool);
493} 641}
494 642

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines