ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.3 by root, Thu Jun 25 15:59:57 2015 UTC vs.
Revision 1.6 by root, Thu Jun 25 18:08:47 2015 UTC

52 52
53#ifndef ETP_TYPE_GROUP 53#ifndef ETP_TYPE_GROUP
54# define ETP_TYPE_GROUP 1 54# define ETP_TYPE_GROUP 1
55#endif 55#endif
56 56
57#ifndef ETP_WANT_POLL
58# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59#endif
60#ifndef ETP_DONE_POLL
61# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62#endif
63
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 64#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 65
59#define ETP_TICKS ((1000000 + 1023) >> 10) 66#define ETP_TICKS ((1000000 + 1023) >> 10)
67
68enum {
69 ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
70 ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
71};
60 72
61/* calculate time difference in ~1/ETP_TICKS of a second */ 73/* calculate time difference in ~1/ETP_TICKS of a second */
62ecb_inline int 74ecb_inline int
63etp_tvdiff (struct timeval *tv1, struct timeval *tv2) 75etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64{ 76{
65 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 77 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
66 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 78 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
67} 79}
68 80
69static unsigned int started, idle, wanted = 4;
70
71static void (*want_poll_cb) (void);
72static void (*done_poll_cb) (void);
73
74static unsigned int max_poll_time; /* reslock */
75static unsigned int max_poll_reqs; /* reslock */
76
77static unsigned int nreqs; /* reqlock */
78static unsigned int nready; /* reqlock */
79static unsigned int npending; /* reqlock */
80static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
81static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
82
83static xmutex_t wrklock;
84static xmutex_t reslock;
85static xmutex_t reqlock;
86static xcond_t reqwait;
87
88struct etp_tmpbuf 81struct etp_tmpbuf
89{ 82{
90 void *ptr; 83 void *ptr;
91 int len; 84 int len;
92}; 85};
99 free (buf->ptr); 92 free (buf->ptr);
100 buf->ptr = malloc (buf->len = len); 93 buf->ptr = malloc (buf->len = len);
101 } 94 }
102 95
103 return buf->ptr; 96 return buf->ptr;
104}
105
106typedef struct etp_worker
107{
108 struct etp_tmpbuf tmpbuf;
109
110 /* locked by wrklock */
111 struct etp_worker *prev, *next;
112
113 xthread_t tid;
114
115#ifdef ETP_WORKER_COMMON
116 ETP_WORKER_COMMON
117#endif
118} etp_worker;
119
120static etp_worker wrk_first; /* NOT etp */
121
122#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
123#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
124
125/* worker threads management */
126
127static void
128etp_worker_clear (etp_worker *wrk)
129{
130}
131
132static void ecb_cold
133etp_worker_free (etp_worker *wrk)
134{
135 free (wrk->tmpbuf.ptr);
136
137 wrk->next->prev = wrk->prev;
138 wrk->prev->next = wrk->next;
139
140 free (wrk);
141}
142
143ETP_API_DECL unsigned int
144etp_nreqs (void)
145{
146 int retval;
147 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
148 retval = nreqs;
149 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
150 return retval;
151}
152
153ETP_API_DECL unsigned int
154etp_nready (void)
155{
156 unsigned int retval;
157
158 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159 retval = nready;
160 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
161
162 return retval;
163}
164
165ETP_API_DECL unsigned int
166etp_npending (void)
167{
168 unsigned int retval;
169
170 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
171 retval = npending;
172 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
173
174 return retval;
175}
176
177ETP_API_DECL unsigned int
178etp_nthreads (void)
179{
180 unsigned int retval;
181
182 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
183 retval = started;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
185
186 return retval;
187} 97}
188 98
189/* 99/*
190 * a somewhat faster data structure might be nice, but 100 * a somewhat faster data structure might be nice, but
191 * with 8 priorities this actually needs <20 insns 101 * with 8 priorities this actually needs <20 insns
192 * per shift, the most expensive operation. 102 * per shift, the most expensive operation.
193 */ 103 */
194typedef struct { 104typedef struct
105{
195 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
196 int size; 107 int size;
197} etp_reqq; 108} etp_reqq;
198 109
110struct etp_pool
111{
112 void *userdata;
113
199static etp_reqq req_queue; 114 etp_reqq req_queue;
200static etp_reqq res_queue; 115 etp_reqq res_queue;
116
117 unsigned int started, idle, wanted;
118
119 unsigned int max_poll_time; /* pool->reslock */
120 unsigned int max_poll_reqs; /* pool->reslock */
121
122 unsigned int nreqs; /* pool->reqlock */
123 unsigned int nready; /* pool->reqlock */
124 unsigned int npending; /* pool->reqlock */
125 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
126 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
127
128 void (*want_poll_cb) (void *userdata);
129 void (*done_poll_cb) (void *userdata);
130
131 xmutex_t wrklock;
132 xmutex_t reslock;
133 xmutex_t reqlock;
134 xcond_t reqwait;
135};
136
137typedef struct etp_pool *etp_pool;
138
139typedef struct etp_worker
140{
141 etp_pool pool;
142
143 struct etp_tmpbuf tmpbuf;
144
145 /* locked by pool->wrklock */
146 struct etp_worker *prev, *next;
147
148 xthread_t tid;
149
150#ifdef ETP_WORKER_COMMON
151 ETP_WORKER_COMMON
152#endif
153} etp_worker;
154
155static etp_worker wrk_first; /* NOT etp */
156
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159
160/* worker threads management */
161
162static void
163etp_worker_clear (etp_worker *wrk)
164{
165}
166
167static void ecb_cold
168etp_worker_free (etp_worker *wrk)
169{
170 free (wrk->tmpbuf.ptr);
171
172 wrk->next->prev = wrk->prev;
173 wrk->prev->next = wrk->next;
174
175 free (wrk);
176}
177
178ETP_API_DECL unsigned int
179etp_nreqs (etp_pool pool)
180{
181 int retval;
182 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
183 retval = pool->nreqs;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
185 return retval;
186}
187
188ETP_API_DECL unsigned int
189etp_nready (etp_pool pool)
190{
191 unsigned int retval;
192
193 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
194 retval = pool->nready;
195 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
196
197 return retval;
198}
199
200ETP_API_DECL unsigned int
201etp_npending (etp_pool pool)
202{
203 unsigned int retval;
204
205 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
206 retval = pool->npending;
207 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
208
209 return retval;
210}
211
212ETP_API_DECL unsigned int
213etp_nthreads (etp_pool pool)
214{
215 unsigned int retval;
216
217 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
218 retval = pool->started;
219 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
220
221 return retval;
222}
201 223
202static void ecb_noinline ecb_cold 224static void ecb_noinline ecb_cold
203reqq_init (etp_reqq *q) 225reqq_init (etp_reqq *q)
204{ 226{
205 int pri; 227 int pri;
252 274
253 abort (); 275 abort ();
254} 276}
255 277
256ETP_API_DECL int ecb_cold 278ETP_API_DECL int ecb_cold
257etp_init (void (*want_poll)(void), void (*done_poll)(void)) 279etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
258{ 280{
259 X_MUTEX_CREATE (wrklock); 281 X_MUTEX_CREATE (pool->wrklock);
260 X_MUTEX_CREATE (reslock); 282 X_MUTEX_CREATE (pool->reslock);
261 X_MUTEX_CREATE (reqlock); 283 X_MUTEX_CREATE (pool->reqlock);
262 X_COND_CREATE (reqwait); 284 X_COND_CREATE (pool->reqwait);
263 285
264 reqq_init (&req_queue); 286 reqq_init (&pool->req_queue);
265 reqq_init (&res_queue); 287 reqq_init (&pool->res_queue);
266 288
267 wrk_first.next = 289 wrk_first.next =
268 wrk_first.prev = &wrk_first; 290 wrk_first.prev = &wrk_first;
269 291
270 started = 0; 292 pool->started = 0;
271 idle = 0; 293 pool->idle = 0;
272 nreqs = 0; 294 pool->nreqs = 0;
273 nready = 0; 295 pool->nready = 0;
274 npending = 0; 296 pool->npending = 0;
297 pool->wanted = 4;
275 298
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301
276 want_poll_cb = want_poll; 302 pool->want_poll_cb = want_poll;
277 done_poll_cb = done_poll; 303 pool->done_poll_cb = done_poll;
278 304
279 return 0; 305 return 0;
280} 306}
281 307
282/* not yet in etp.c */ 308static void ecb_noinline ecb_cold
309etp_proc_init (void)
310{
311#if HAVE_PRCTL_SET_NAME
312 /* provide a more sensible "thread name" */
313 char name[16 + 1];
314 const int namelen = sizeof (name) - 1;
315 int len;
316
317 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
318 name [namelen] = 0;
319 len = strlen (name);
320 strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
321 prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
322#endif
323}
324
283X_THREAD_PROC (etp_proc); 325X_THREAD_PROC (etp_proc)
326{
327 ETP_REQ *req;
328 struct timespec ts;
329 etp_worker *self = (etp_worker *)thr_arg;
330 etp_pool pool = self->pool;
331
332 etp_proc_init ();
333
334 /* try to distribute timeouts somewhat evenly */
335 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
336
337 for (;;)
338 {
339 ts.tv_sec = 0;
340
341 X_LOCK (pool->reqlock);
342
343 for (;;)
344 {
345 req = reqq_shift (&pool->req_queue);
346
347 if (req)
348 break;
349
350 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
351 {
352 X_UNLOCK (pool->reqlock);
353 X_LOCK (pool->wrklock);
354 --pool->started;
355 X_UNLOCK (pool->wrklock);
356 goto quit;
357 }
358
359 ++pool->idle;
360
361 if (pool->idle <= pool->max_idle)
362 /* we are allowed to pool->idle, so do so without any timeout */
363 X_COND_WAIT (pool->reqwait, pool->reqlock);
364 else
365 {
366 /* initialise timeout once */
367 if (!ts.tv_sec)
368 ts.tv_sec = time (0) + pool->idle_timeout;
369
370 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
371 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
372 }
373
374 --pool->idle;
375 }
376
377 --pool->nready;
378
379 X_UNLOCK (pool->reqlock);
380
381 if (req->type == ETP_TYPE_QUIT)
382 goto quit;
383
384 ETP_EXECUTE (self, req);
385
386 X_LOCK (pool->reslock);
387
388 ++pool->npending;
389
390 if (!reqq_push (&pool->res_queue, req))
391 ETP_WANT_POLL (poll);
392
393 etp_worker_clear (self);
394
395 X_UNLOCK (pool->reslock);
396 }
397
398quit:
399 free (req);
400
401 X_LOCK (pool->wrklock);
402 etp_worker_free (self);
403 X_UNLOCK (pool->wrklock);
404
405 return 0;
406}
284 407
285static void ecb_cold 408static void ecb_cold
286etp_start_thread (void) 409etp_start_thread (etp_pool pool)
287{ 410{
288 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 411 etp_worker *wrk = calloc (1, sizeof (etp_worker));
289 412
290 /*TODO*/ 413 /*TODO*/
291 assert (("unable to allocate worker thread data", wrk)); 414 assert (("unable to allocate worker thread data", wrk));
292 415
416 wrk->pool = pool;
417
293 X_LOCK (wrklock); 418 X_LOCK (pool->wrklock);
294 419
295 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 420 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
296 { 421 {
297 wrk->prev = &wrk_first; 422 wrk->prev = &wrk_first;
298 wrk->next = wrk_first.next; 423 wrk->next = wrk_first.next;
299 wrk_first.next->prev = wrk; 424 wrk_first.next->prev = wrk;
300 wrk_first.next = wrk; 425 wrk_first.next = wrk;
301 ++started; 426 ++pool->started;
302 } 427 }
303 else 428 else
304 free (wrk); 429 free (wrk);
305 430
306 X_UNLOCK (wrklock); 431 X_UNLOCK (pool->wrklock);
307} 432}
308 433
309static void 434static void
310etp_maybe_start_thread (void) 435etp_maybe_start_thread (etp_pool pool)
311{ 436{
312 if (ecb_expect_true (etp_nthreads () >= wanted)) 437 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
313 return; 438 return;
314 439
315 /* todo: maybe use idle here, but might be less exact */ 440 /* todo: maybe use pool->idle here, but might be less exact */
316 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 441 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
317 return; 442 return;
318 443
319 etp_start_thread (); 444 etp_start_thread (pool);
320} 445}
321 446
322static void ecb_cold 447static void ecb_cold
323etp_end_thread (void) 448etp_end_thread (etp_pool pool)
324{ 449{
325 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 450 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
326 451
327 req->type = ETP_TYPE_QUIT; 452 req->type = ETP_TYPE_QUIT;
328 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 453 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
329 454
330 X_LOCK (reqlock); 455 X_LOCK (pool->reqlock);
331 reqq_push (&req_queue, req); 456 reqq_push (&pool->req_queue, req);
332 X_COND_SIGNAL (reqwait); 457 X_COND_SIGNAL (pool->reqwait);
333 X_UNLOCK (reqlock); 458 X_UNLOCK (pool->reqlock);
334 459
335 X_LOCK (wrklock); 460 X_LOCK (pool->wrklock);
336 --started; 461 --pool->started;
337 X_UNLOCK (wrklock); 462 X_UNLOCK (pool->wrklock);
338} 463}
339 464
340ETP_API_DECL int 465ETP_API_DECL int
341etp_poll (void) 466etp_poll (etp_pool pool)
342{ 467{
343 unsigned int maxreqs; 468 unsigned int maxreqs;
344 unsigned int maxtime; 469 unsigned int maxtime;
345 struct timeval tv_start, tv_now; 470 struct timeval tv_start, tv_now;
346 471
347 X_LOCK (reslock); 472 X_LOCK (pool->reslock);
348 maxreqs = max_poll_reqs; 473 maxreqs = pool->max_poll_reqs;
349 maxtime = max_poll_time; 474 maxtime = pool->max_poll_time;
350 X_UNLOCK (reslock); 475 X_UNLOCK (pool->reslock);
351 476
352 if (maxtime) 477 if (maxtime)
353 gettimeofday (&tv_start, 0); 478 gettimeofday (&tv_start, 0);
354 479
355 for (;;) 480 for (;;)
356 { 481 {
357 ETP_REQ *req; 482 ETP_REQ *req;
358 483
359 etp_maybe_start_thread (); 484 etp_maybe_start_thread (pool);
360 485
361 X_LOCK (reslock); 486 X_LOCK (pool->reslock);
362 req = reqq_shift (&res_queue); 487 req = reqq_shift (&pool->res_queue);
363 488
364 if (req) 489 if (req)
365 { 490 {
366 --npending; 491 --pool->npending;
367 492
368 if (!res_queue.size && done_poll_cb) 493 if (!pool->res_queue.size)
369 done_poll_cb (); 494 ETP_DONE_POLL (pool->userdata);
370 } 495 }
371 496
372 X_UNLOCK (reslock); 497 X_UNLOCK (pool->reslock);
373 498
374 if (!req) 499 if (!req)
375 return 0; 500 return 0;
376 501
377 X_LOCK (reqlock); 502 X_LOCK (pool->reqlock);
378 --nreqs; 503 --pool->nreqs;
379 X_UNLOCK (reqlock); 504 X_UNLOCK (pool->reqlock);
380 505
381 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 506 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
382 { 507 {
383 req->int1 = 1; /* mark request as delayed */ 508 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
384 continue; 509 continue;
385 } 510 }
386 else 511 else
387 { 512 {
388 int res = ETP_FINISH (req); 513 int res = ETP_FINISH (req);
405 errno = EAGAIN; 530 errno = EAGAIN;
406 return -1; 531 return -1;
407} 532}
408 533
409ETP_API_DECL void 534ETP_API_DECL void
410etp_grp_cancel (ETP_REQ *grp); 535etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
411 536
412ETP_API_DECL void 537ETP_API_DECL void
413etp_cancel (ETP_REQ *req) 538etp_cancel (etp_pool pool, ETP_REQ *req)
414{ 539{
415 req->cancelled = 1; 540 req->cancelled = 1;
416 541
417 etp_grp_cancel (req); 542 etp_grp_cancel (pool, req);
418} 543}
419 544
420ETP_API_DECL void 545ETP_API_DECL void
421etp_grp_cancel (ETP_REQ *grp) 546etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
422{ 547{
423 for (grp = grp->grp_first; grp; grp = grp->grp_next) 548 for (grp = grp->grp_first; grp; grp = grp->grp_next)
424 etp_cancel (grp); 549 etp_cancel (pool, grp);
425} 550}
426 551
427ETP_API_DECL void 552ETP_API_DECL void
428etp_submit (ETP_REQ *req) 553etp_submit (etp_pool pool, ETP_REQ *req)
429{ 554{
430 req->pri -= ETP_PRI_MIN; 555 req->pri -= ETP_PRI_MIN;
431 556
432 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 557 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
433 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 558 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
434 559
435 if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) 560 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
436 { 561 {
437 /* I hope this is worth it :/ */ 562 /* I hope this is worth it :/ */
438 X_LOCK (reqlock); 563 X_LOCK (pool->reqlock);
439 ++nreqs; 564 ++pool->nreqs;
440 X_UNLOCK (reqlock); 565 X_UNLOCK (pool->reqlock);
441 566
442 X_LOCK (reslock); 567 X_LOCK (pool->reslock);
443 568
444 ++npending; 569 ++pool->npending;
445 570
446 if (!reqq_push (&res_queue, req) && want_poll_cb) 571 if (!reqq_push (&pool->res_queue, req))
447 want_poll_cb (); 572 ETP_WANT_POLL (pool);
448 573
449 X_UNLOCK (reslock); 574 X_UNLOCK (pool->reslock);
450 } 575 }
451 else 576 else
452 { 577 {
453 X_LOCK (reqlock); 578 X_LOCK (pool->reqlock);
454 ++nreqs; 579 ++pool->nreqs;
455 ++nready; 580 ++pool->nready;
456 reqq_push (&req_queue, req); 581 reqq_push (&pool->req_queue, req);
457 X_COND_SIGNAL (reqwait); 582 X_COND_SIGNAL (pool->reqwait);
458 X_UNLOCK (reqlock); 583 X_UNLOCK (pool->reqlock);
459 584
460 etp_maybe_start_thread (); 585 etp_maybe_start_thread (pool);
461 } 586 }
462} 587}
463 588
464ETP_API_DECL void ecb_cold 589ETP_API_DECL void ecb_cold
465etp_set_max_poll_time (double nseconds) 590etp_set_max_poll_time (etp_pool pool, double nseconds)
466{ 591{
467 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 592 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
468 max_poll_time = nseconds * ETP_TICKS; 593 pool->max_poll_time = nseconds * ETP_TICKS;
469 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 594 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
470} 595}
471 596
472ETP_API_DECL void ecb_cold 597ETP_API_DECL void ecb_cold
473etp_set_max_poll_reqs (unsigned int maxreqs) 598etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
474{ 599{
475 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 600 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
476 max_poll_reqs = maxreqs; 601 pool->max_poll_reqs = maxreqs;
477 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 602 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
478} 603}
479 604
480ETP_API_DECL void ecb_cold 605ETP_API_DECL void ecb_cold
481etp_set_max_idle (unsigned int nthreads) 606etp_set_max_idle (etp_pool pool, unsigned int nthreads)
482{ 607{
483 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 608 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
484 max_idle = nthreads; 609 pool->max_idle = nthreads;
485 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 610 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
486} 611}
487 612
488ETP_API_DECL void ecb_cold 613ETP_API_DECL void ecb_cold
489etp_set_idle_timeout (unsigned int seconds) 614etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
490{ 615{
491 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 616 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
492 idle_timeout = seconds; 617 pool->idle_timeout = seconds;
493 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 618 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
494} 619}
495 620
496ETP_API_DECL void ecb_cold 621ETP_API_DECL void ecb_cold
497etp_set_min_parallel (unsigned int nthreads) 622etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
498{ 623{
499 if (wanted < nthreads) 624 if (pool->wanted < nthreads)
500 wanted = nthreads; 625 pool->wanted = nthreads;
501} 626}
502 627
503ETP_API_DECL void ecb_cold 628ETP_API_DECL void ecb_cold
504etp_set_max_parallel (unsigned int nthreads) 629etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
505{ 630{
506 if (wanted > nthreads) 631 if (pool->wanted > nthreads)
507 wanted = nthreads; 632 pool->wanted = nthreads;
508 633
509 while (started > wanted) 634 while (pool->started > pool->wanted)
510 etp_end_thread (); 635 etp_end_thread (pool);
511} 636}
512 637

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines