ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.4 by root, Thu Jun 25 17:05:07 2015 UTC vs.
Revision 1.6 by root, Thu Jun 25 18:08:47 2015 UTC

52 52
53#ifndef ETP_TYPE_GROUP 53#ifndef ETP_TYPE_GROUP
54# define ETP_TYPE_GROUP 1 54# define ETP_TYPE_GROUP 1
55#endif 55#endif
56 56
57#ifndef ETP_WANT_POLL
58# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59#endif
60#ifndef ETP_DONE_POLL
61# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62#endif
63
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 64#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 65
59#define ETP_TICKS ((1000000 + 1023) >> 10) 66#define ETP_TICKS ((1000000 + 1023) >> 10)
60 67
61enum { 68enum {
69{ 76{
70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 77 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 78 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72} 79}
73 80
74static unsigned int started, idle, wanted = 4;
75
76static void (*want_poll_cb) (void);
77static void (*done_poll_cb) (void);
78
79static unsigned int max_poll_time; /* reslock */
80static unsigned int max_poll_reqs; /* reslock */
81
82static unsigned int nreqs; /* reqlock */
83static unsigned int nready; /* reqlock */
84static unsigned int npending; /* reqlock */
85static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87
88static xmutex_t wrklock;
89static xmutex_t reslock;
90static xmutex_t reqlock;
91static xcond_t reqwait;
92
93struct etp_tmpbuf 81struct etp_tmpbuf
94{ 82{
95 void *ptr; 83 void *ptr;
96 int len; 84 int len;
97}; 85};
104 free (buf->ptr); 92 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len); 93 buf->ptr = malloc (buf->len = len);
106 } 94 }
107 95
108 return buf->ptr; 96 return buf->ptr;
109}
110
111typedef struct etp_worker
112{
113 struct etp_tmpbuf tmpbuf;
114
115 /* locked by wrklock */
116 struct etp_worker *prev, *next;
117
118 xthread_t tid;
119
120#ifdef ETP_WORKER_COMMON
121 ETP_WORKER_COMMON
122#endif
123} etp_worker;
124
125static etp_worker wrk_first; /* NOT etp */
126
127#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
128#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
129
130/* worker threads management */
131
132static void
133etp_worker_clear (etp_worker *wrk)
134{
135}
136
137static void ecb_cold
138etp_worker_free (etp_worker *wrk)
139{
140 free (wrk->tmpbuf.ptr);
141
142 wrk->next->prev = wrk->prev;
143 wrk->prev->next = wrk->next;
144
145 free (wrk);
146}
147
148ETP_API_DECL unsigned int
149etp_nreqs (void)
150{
151 int retval;
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = nreqs;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155 return retval;
156}
157
158ETP_API_DECL unsigned int
159etp_nready (void)
160{
161 unsigned int retval;
162
163 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
164 retval = nready;
165 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
166
167 return retval;
168}
169
170ETP_API_DECL unsigned int
171etp_npending (void)
172{
173 unsigned int retval;
174
175 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
176 retval = npending;
177 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
178
179 return retval;
180}
181
182ETP_API_DECL unsigned int
183etp_nthreads (void)
184{
185 unsigned int retval;
186
187 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
188 retval = started;
189 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
190
191 return retval;
192} 97}
193 98
194/* 99/*
195 * a somewhat faster data structure might be nice, but 100 * a somewhat faster data structure might be nice, but
196 * with 8 priorities this actually needs <20 insns 101 * with 8 priorities this actually needs <20 insns
197 * per shift, the most expensive operation. 102 * per shift, the most expensive operation.
198 */ 103 */
199typedef struct { 104typedef struct
105{
200 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
201 int size; 107 int size;
202} etp_reqq; 108} etp_reqq;
203 109
110struct etp_pool
111{
112 void *userdata;
113
204static etp_reqq req_queue; 114 etp_reqq req_queue;
205static etp_reqq res_queue; 115 etp_reqq res_queue;
116
117 unsigned int started, idle, wanted;
118
119 unsigned int max_poll_time; /* pool->reslock */
120 unsigned int max_poll_reqs; /* pool->reslock */
121
122 unsigned int nreqs; /* pool->reqlock */
123 unsigned int nready; /* pool->reqlock */
124 unsigned int npending; /* pool->reqlock */
125 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
126 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
127
128 void (*want_poll_cb) (void *userdata);
129 void (*done_poll_cb) (void *userdata);
130
131 xmutex_t wrklock;
132 xmutex_t reslock;
133 xmutex_t reqlock;
134 xcond_t reqwait;
135};
136
137typedef struct etp_pool *etp_pool;
138
139typedef struct etp_worker
140{
141 etp_pool pool;
142
143 struct etp_tmpbuf tmpbuf;
144
145 /* locked by pool->wrklock */
146 struct etp_worker *prev, *next;
147
148 xthread_t tid;
149
150#ifdef ETP_WORKER_COMMON
151 ETP_WORKER_COMMON
152#endif
153} etp_worker;
154
155static etp_worker wrk_first; /* NOT etp */
156
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159
160/* worker threads management */
161
162static void
163etp_worker_clear (etp_worker *wrk)
164{
165}
166
167static void ecb_cold
168etp_worker_free (etp_worker *wrk)
169{
170 free (wrk->tmpbuf.ptr);
171
172 wrk->next->prev = wrk->prev;
173 wrk->prev->next = wrk->next;
174
175 free (wrk);
176}
177
178ETP_API_DECL unsigned int
179etp_nreqs (etp_pool pool)
180{
181 int retval;
182 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
183 retval = pool->nreqs;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
185 return retval;
186}
187
188ETP_API_DECL unsigned int
189etp_nready (etp_pool pool)
190{
191 unsigned int retval;
192
193 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
194 retval = pool->nready;
195 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
196
197 return retval;
198}
199
200ETP_API_DECL unsigned int
201etp_npending (etp_pool pool)
202{
203 unsigned int retval;
204
205 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
206 retval = pool->npending;
207 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
208
209 return retval;
210}
211
212ETP_API_DECL unsigned int
213etp_nthreads (etp_pool pool)
214{
215 unsigned int retval;
216
217 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
218 retval = pool->started;
219 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
220
221 return retval;
222}
206 223
207static void ecb_noinline ecb_cold 224static void ecb_noinline ecb_cold
208reqq_init (etp_reqq *q) 225reqq_init (etp_reqq *q)
209{ 226{
210 int pri; 227 int pri;
257 274
258 abort (); 275 abort ();
259} 276}
260 277
261ETP_API_DECL int ecb_cold 278ETP_API_DECL int ecb_cold
262etp_init (void (*want_poll)(void), void (*done_poll)(void)) 279etp_init (etp_pool pool, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
263{ 280{
264 X_MUTEX_CREATE (wrklock); 281 X_MUTEX_CREATE (pool->wrklock);
265 X_MUTEX_CREATE (reslock); 282 X_MUTEX_CREATE (pool->reslock);
266 X_MUTEX_CREATE (reqlock); 283 X_MUTEX_CREATE (pool->reqlock);
267 X_COND_CREATE (reqwait); 284 X_COND_CREATE (pool->reqwait);
268 285
269 reqq_init (&req_queue); 286 reqq_init (&pool->req_queue);
270 reqq_init (&res_queue); 287 reqq_init (&pool->res_queue);
271 288
272 wrk_first.next = 289 wrk_first.next =
273 wrk_first.prev = &wrk_first; 290 wrk_first.prev = &wrk_first;
274 291
275 started = 0; 292 pool->started = 0;
276 idle = 0; 293 pool->idle = 0;
277 nreqs = 0; 294 pool->nreqs = 0;
278 nready = 0; 295 pool->nready = 0;
279 npending = 0; 296 pool->npending = 0;
297 pool->wanted = 4;
280 298
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301
281 want_poll_cb = want_poll; 302 pool->want_poll_cb = want_poll;
282 done_poll_cb = done_poll; 303 pool->done_poll_cb = done_poll;
283 304
284 return 0; 305 return 0;
285} 306}
286 307
287static void ecb_noinline ecb_cold 308static void ecb_noinline ecb_cold
304X_THREAD_PROC (etp_proc) 325X_THREAD_PROC (etp_proc)
305{ 326{
306 ETP_REQ *req; 327 ETP_REQ *req;
307 struct timespec ts; 328 struct timespec ts;
308 etp_worker *self = (etp_worker *)thr_arg; 329 etp_worker *self = (etp_worker *)thr_arg;
330 etp_pool pool = self->pool;
309 331
310 etp_proc_init (); 332 etp_proc_init ();
311 333
312 /* try to distribute timeouts somewhat evenly */ 334 /* try to distribute timeouts somewhat evenly */
313 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 335 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
314 336
315 for (;;) 337 for (;;)
316 { 338 {
317 ts.tv_sec = 0; 339 ts.tv_sec = 0;
318 340
319 X_LOCK (reqlock); 341 X_LOCK (pool->reqlock);
320 342
321 for (;;) 343 for (;;)
322 { 344 {
323 req = reqq_shift (&req_queue); 345 req = reqq_shift (&pool->req_queue);
324 346
325 if (req) 347 if (req)
326 break; 348 break;
327 349
328 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 350 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
329 { 351 {
330 X_UNLOCK (reqlock); 352 X_UNLOCK (pool->reqlock);
331 X_LOCK (wrklock); 353 X_LOCK (pool->wrklock);
332 --started; 354 --pool->started;
333 X_UNLOCK (wrklock); 355 X_UNLOCK (pool->wrklock);
334 goto quit; 356 goto quit;
335 } 357 }
336 358
337 ++idle; 359 ++pool->idle;
338 360
339 if (idle <= max_idle) 361 if (pool->idle <= pool->max_idle)
340 /* we are allowed to idle, so do so without any timeout */ 362 /* we are allowed to pool->idle, so do so without any timeout */
341 X_COND_WAIT (reqwait, reqlock); 363 X_COND_WAIT (pool->reqwait, pool->reqlock);
342 else 364 else
343 { 365 {
344 /* initialise timeout once */ 366 /* initialise timeout once */
345 if (!ts.tv_sec) 367 if (!ts.tv_sec)
346 ts.tv_sec = time (0) + idle_timeout; 368 ts.tv_sec = time (0) + pool->idle_timeout;
347 369
348 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) 370 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
349 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ 371 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
350 } 372 }
351 373
352 --idle; 374 --pool->idle;
353 } 375 }
354 376
355 --nready; 377 --pool->nready;
356 378
357 X_UNLOCK (reqlock); 379 X_UNLOCK (pool->reqlock);
358 380
359 if (req->type == ETP_TYPE_QUIT) 381 if (req->type == ETP_TYPE_QUIT)
360 goto quit; 382 goto quit;
361 383
362 ETP_EXECUTE (self, req); 384 ETP_EXECUTE (self, req);
363 385
364 X_LOCK (reslock); 386 X_LOCK (pool->reslock);
365 387
366 ++npending; 388 ++pool->npending;
367 389
368 if (!reqq_push (&res_queue, req) && want_poll_cb) 390 if (!reqq_push (&pool->res_queue, req))
369 want_poll_cb (); 391 ETP_WANT_POLL (poll);
370 392
371 etp_worker_clear (self); 393 etp_worker_clear (self);
372 394
373 X_UNLOCK (reslock); 395 X_UNLOCK (pool->reslock);
374 } 396 }
375 397
376quit: 398quit:
377 free (req); 399 free (req);
378 400
379 X_LOCK (wrklock); 401 X_LOCK (pool->wrklock);
380 etp_worker_free (self); 402 etp_worker_free (self);
381 X_UNLOCK (wrklock); 403 X_UNLOCK (pool->wrklock);
382 404
383 return 0; 405 return 0;
384} 406}
385 407
386static void ecb_cold 408static void ecb_cold
387etp_start_thread (void) 409etp_start_thread (etp_pool pool)
388{ 410{
389 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 411 etp_worker *wrk = calloc (1, sizeof (etp_worker));
390 412
391 /*TODO*/ 413 /*TODO*/
392 assert (("unable to allocate worker thread data", wrk)); 414 assert (("unable to allocate worker thread data", wrk));
393 415
416 wrk->pool = pool;
417
394 X_LOCK (wrklock); 418 X_LOCK (pool->wrklock);
395 419
396 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 420 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
397 { 421 {
398 wrk->prev = &wrk_first; 422 wrk->prev = &wrk_first;
399 wrk->next = wrk_first.next; 423 wrk->next = wrk_first.next;
400 wrk_first.next->prev = wrk; 424 wrk_first.next->prev = wrk;
401 wrk_first.next = wrk; 425 wrk_first.next = wrk;
402 ++started; 426 ++pool->started;
403 } 427 }
404 else 428 else
405 free (wrk); 429 free (wrk);
406 430
407 X_UNLOCK (wrklock); 431 X_UNLOCK (pool->wrklock);
408} 432}
409 433
410static void 434static void
411etp_maybe_start_thread (void) 435etp_maybe_start_thread (etp_pool pool)
412{ 436{
413 if (ecb_expect_true (etp_nthreads () >= wanted)) 437 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
414 return; 438 return;
415 439
416 /* todo: maybe use idle here, but might be less exact */ 440 /* todo: maybe use pool->idle here, but might be less exact */
417 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 441 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
418 return; 442 return;
419 443
420 etp_start_thread (); 444 etp_start_thread (pool);
421} 445}
422 446
423static void ecb_cold 447static void ecb_cold
424etp_end_thread (void) 448etp_end_thread (etp_pool pool)
425{ 449{
426 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 450 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
427 451
428 req->type = ETP_TYPE_QUIT; 452 req->type = ETP_TYPE_QUIT;
429 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 453 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
430 454
431 X_LOCK (reqlock); 455 X_LOCK (pool->reqlock);
432 reqq_push (&req_queue, req); 456 reqq_push (&pool->req_queue, req);
433 X_COND_SIGNAL (reqwait); 457 X_COND_SIGNAL (pool->reqwait);
434 X_UNLOCK (reqlock); 458 X_UNLOCK (pool->reqlock);
435 459
436 X_LOCK (wrklock); 460 X_LOCK (pool->wrklock);
437 --started; 461 --pool->started;
438 X_UNLOCK (wrklock); 462 X_UNLOCK (pool->wrklock);
439} 463}
440 464
441ETP_API_DECL int 465ETP_API_DECL int
442etp_poll (void) 466etp_poll (etp_pool pool)
443{ 467{
444 unsigned int maxreqs; 468 unsigned int maxreqs;
445 unsigned int maxtime; 469 unsigned int maxtime;
446 struct timeval tv_start, tv_now; 470 struct timeval tv_start, tv_now;
447 471
448 X_LOCK (reslock); 472 X_LOCK (pool->reslock);
449 maxreqs = max_poll_reqs; 473 maxreqs = pool->max_poll_reqs;
450 maxtime = max_poll_time; 474 maxtime = pool->max_poll_time;
451 X_UNLOCK (reslock); 475 X_UNLOCK (pool->reslock);
452 476
453 if (maxtime) 477 if (maxtime)
454 gettimeofday (&tv_start, 0); 478 gettimeofday (&tv_start, 0);
455 479
456 for (;;) 480 for (;;)
457 { 481 {
458 ETP_REQ *req; 482 ETP_REQ *req;
459 483
460 etp_maybe_start_thread (); 484 etp_maybe_start_thread (pool);
461 485
462 X_LOCK (reslock); 486 X_LOCK (pool->reslock);
463 req = reqq_shift (&res_queue); 487 req = reqq_shift (&pool->res_queue);
464 488
465 if (req) 489 if (req)
466 { 490 {
467 --npending; 491 --pool->npending;
468 492
469 if (!res_queue.size && done_poll_cb) 493 if (!pool->res_queue.size)
470 done_poll_cb (); 494 ETP_DONE_POLL (pool->userdata);
471 } 495 }
472 496
473 X_UNLOCK (reslock); 497 X_UNLOCK (pool->reslock);
474 498
475 if (!req) 499 if (!req)
476 return 0; 500 return 0;
477 501
478 X_LOCK (reqlock); 502 X_LOCK (pool->reqlock);
479 --nreqs; 503 --pool->nreqs;
480 X_UNLOCK (reqlock); 504 X_UNLOCK (pool->reqlock);
481 505
482 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 506 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
483 { 507 {
484 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ 508 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
485 continue; 509 continue;
506 errno = EAGAIN; 530 errno = EAGAIN;
507 return -1; 531 return -1;
508} 532}
509 533
510ETP_API_DECL void 534ETP_API_DECL void
511etp_grp_cancel (ETP_REQ *grp); 535etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
512 536
513ETP_API_DECL void 537ETP_API_DECL void
514etp_cancel (ETP_REQ *req) 538etp_cancel (etp_pool pool, ETP_REQ *req)
515{ 539{
516 req->cancelled = 1; 540 req->cancelled = 1;
517 541
518 etp_grp_cancel (req); 542 etp_grp_cancel (pool, req);
519} 543}
520 544
521ETP_API_DECL void 545ETP_API_DECL void
522etp_grp_cancel (ETP_REQ *grp) 546etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
523{ 547{
524 for (grp = grp->grp_first; grp; grp = grp->grp_next) 548 for (grp = grp->grp_first; grp; grp = grp->grp_next)
525 etp_cancel (grp); 549 etp_cancel (pool, grp);
526} 550}
527 551
528ETP_API_DECL void 552ETP_API_DECL void
529etp_submit (ETP_REQ *req) 553etp_submit (etp_pool pool, ETP_REQ *req)
530{ 554{
531 req->pri -= ETP_PRI_MIN; 555 req->pri -= ETP_PRI_MIN;
532 556
533 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 557 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
534 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 558 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
535 559
536 if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) 560 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
537 { 561 {
538 /* I hope this is worth it :/ */ 562 /* I hope this is worth it :/ */
539 X_LOCK (reqlock); 563 X_LOCK (pool->reqlock);
540 ++nreqs; 564 ++pool->nreqs;
541 X_UNLOCK (reqlock); 565 X_UNLOCK (pool->reqlock);
542 566
543 X_LOCK (reslock); 567 X_LOCK (pool->reslock);
544 568
545 ++npending; 569 ++pool->npending;
546 570
547 if (!reqq_push (&res_queue, req) && want_poll_cb) 571 if (!reqq_push (&pool->res_queue, req))
548 want_poll_cb (); 572 ETP_WANT_POLL (pool);
549 573
550 X_UNLOCK (reslock); 574 X_UNLOCK (pool->reslock);
551 } 575 }
552 else 576 else
553 { 577 {
554 X_LOCK (reqlock); 578 X_LOCK (pool->reqlock);
555 ++nreqs; 579 ++pool->nreqs;
556 ++nready; 580 ++pool->nready;
557 reqq_push (&req_queue, req); 581 reqq_push (&pool->req_queue, req);
558 X_COND_SIGNAL (reqwait); 582 X_COND_SIGNAL (pool->reqwait);
559 X_UNLOCK (reqlock); 583 X_UNLOCK (pool->reqlock);
560 584
561 etp_maybe_start_thread (); 585 etp_maybe_start_thread (pool);
562 } 586 }
563} 587}
564 588
565ETP_API_DECL void ecb_cold 589ETP_API_DECL void ecb_cold
566etp_set_max_poll_time (double nseconds) 590etp_set_max_poll_time (etp_pool pool, double nseconds)
567{ 591{
568 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 592 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
569 max_poll_time = nseconds * ETP_TICKS; 593 pool->max_poll_time = nseconds * ETP_TICKS;
570 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 594 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
571} 595}
572 596
573ETP_API_DECL void ecb_cold 597ETP_API_DECL void ecb_cold
574etp_set_max_poll_reqs (unsigned int maxreqs) 598etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
575{ 599{
576 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 600 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
577 max_poll_reqs = maxreqs; 601 pool->max_poll_reqs = maxreqs;
578 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 602 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
579} 603}
580 604
581ETP_API_DECL void ecb_cold 605ETP_API_DECL void ecb_cold
582etp_set_max_idle (unsigned int nthreads) 606etp_set_max_idle (etp_pool pool, unsigned int nthreads)
583{ 607{
584 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 608 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
585 max_idle = nthreads; 609 pool->max_idle = nthreads;
586 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 610 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
587} 611}
588 612
589ETP_API_DECL void ecb_cold 613ETP_API_DECL void ecb_cold
590etp_set_idle_timeout (unsigned int seconds) 614etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
591{ 615{
592 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 616 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
593 idle_timeout = seconds; 617 pool->idle_timeout = seconds;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 618 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
595} 619}
596 620
597ETP_API_DECL void ecb_cold 621ETP_API_DECL void ecb_cold
598etp_set_min_parallel (unsigned int nthreads) 622etp_set_min_parallel (etp_pool pool, unsigned int nthreads)
599{ 623{
600 if (wanted < nthreads) 624 if (pool->wanted < nthreads)
601 wanted = nthreads; 625 pool->wanted = nthreads;
602} 626}
603 627
604ETP_API_DECL void ecb_cold 628ETP_API_DECL void ecb_cold
605etp_set_max_parallel (unsigned int nthreads) 629etp_set_max_parallel (etp_pool pool, unsigned int nthreads)
606{ 630{
607 if (wanted > nthreads) 631 if (pool->wanted > nthreads)
608 wanted = nthreads; 632 pool->wanted = nthreads;
609 633
610 while (started > wanted) 634 while (pool->started > pool->wanted)
611 etp_end_thread (); 635 etp_end_thread (pool);
612} 636}
613 637

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines