ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.4 by root, Thu Jun 25 17:05:07 2015 UTC vs.
Revision 1.12 by root, Tue Aug 14 09:29:50 2018 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
52 52
53#ifndef ETP_TYPE_GROUP 53#ifndef ETP_TYPE_GROUP
54# define ETP_TYPE_GROUP 1 54# define ETP_TYPE_GROUP 1
55#endif 55#endif
56 56
57#ifndef ETP_WANT_POLL
58# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
59#endif
60#ifndef ETP_DONE_POLL
61# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
62#endif
63
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 64#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 65
59#define ETP_TICKS ((1000000 + 1023) >> 10) 66#define ETP_TICKS ((1000000 + 1023) >> 10)
60 67
61enum { 68enum {
69{ 76{
70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 77 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 78 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72} 79}
73 80
74static unsigned int started, idle, wanted = 4;
75
76static void (*want_poll_cb) (void);
77static void (*done_poll_cb) (void);
78
79static unsigned int max_poll_time; /* reslock */
80static unsigned int max_poll_reqs; /* reslock */
81
82static unsigned int nreqs; /* reqlock */
83static unsigned int nready; /* reqlock */
84static unsigned int npending; /* reqlock */
85static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87
88static xmutex_t wrklock;
89static xmutex_t reslock;
90static xmutex_t reqlock;
91static xcond_t reqwait;
92
93struct etp_tmpbuf 81struct etp_tmpbuf
94{ 82{
95 void *ptr; 83 void *ptr;
96 int len; 84 int len;
97}; 85};
104 free (buf->ptr); 92 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len); 93 buf->ptr = malloc (buf->len = len);
106 } 94 }
107 95
108 return buf->ptr; 96 return buf->ptr;
109}
110
111typedef struct etp_worker
112{
113 struct etp_tmpbuf tmpbuf;
114
115 /* locked by wrklock */
116 struct etp_worker *prev, *next;
117
118 xthread_t tid;
119
120#ifdef ETP_WORKER_COMMON
121 ETP_WORKER_COMMON
122#endif
123} etp_worker;
124
125static etp_worker wrk_first; /* NOT etp */
126
127#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
128#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
129
130/* worker threads management */
131
132static void
133etp_worker_clear (etp_worker *wrk)
134{
135}
136
137static void ecb_cold
138etp_worker_free (etp_worker *wrk)
139{
140 free (wrk->tmpbuf.ptr);
141
142 wrk->next->prev = wrk->prev;
143 wrk->prev->next = wrk->next;
144
145 free (wrk);
146}
147
148ETP_API_DECL unsigned int
149etp_nreqs (void)
150{
151 int retval;
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = nreqs;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155 return retval;
156}
157
158ETP_API_DECL unsigned int
159etp_nready (void)
160{
161 unsigned int retval;
162
163 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
164 retval = nready;
165 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
166
167 return retval;
168}
169
170ETP_API_DECL unsigned int
171etp_npending (void)
172{
173 unsigned int retval;
174
175 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
176 retval = npending;
177 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
178
179 return retval;
180}
181
182ETP_API_DECL unsigned int
183etp_nthreads (void)
184{
185 unsigned int retval;
186
187 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
188 retval = started;
189 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
190
191 return retval;
192} 97}
193 98
194/* 99/*
195 * a somewhat faster data structure might be nice, but 100 * a somewhat faster data structure might be nice, but
196 * with 8 priorities this actually needs <20 insns 101 * with 8 priorities this actually needs <20 insns
197 * per shift, the most expensive operation. 102 * per shift, the most expensive operation.
198 */ 103 */
199typedef struct { 104typedef struct
105{
200 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 106 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
201 int size; 107 int size;
202} etp_reqq; 108} etp_reqq;
203 109
110typedef struct etp_pool *etp_pool;
111
112typedef struct etp_worker
113{
114 etp_pool pool;
115
116 struct etp_tmpbuf tmpbuf;
117
118 /* locked by pool->wrklock */
119 struct etp_worker *prev, *next;
120
121 xthread_t tid;
122
123#ifdef ETP_WORKER_COMMON
124 ETP_WORKER_COMMON
125#endif
126} etp_worker;
127
128struct etp_pool
129{
130 void *userdata;
131
204static etp_reqq req_queue; 132 etp_reqq req_queue;
205static etp_reqq res_queue; 133 etp_reqq res_queue;
134
135 unsigned int started, idle, wanted;
136
137 unsigned int max_poll_time; /* pool->reslock */
138 unsigned int max_poll_reqs; /* pool->reslock */
139
140 unsigned int nreqs; /* pool->reqlock */
141 unsigned int nready; /* pool->reqlock */
142 unsigned int npending; /* pool->reqlock */
143 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
144 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
145
146 void (*want_poll_cb) (void *userdata);
147 void (*done_poll_cb) (void *userdata);
148
149 xmutex_t wrklock;
150 xmutex_t reslock;
151 xmutex_t reqlock;
152 xcond_t reqwait;
153
154 etp_worker wrk_first;
155};
156
157#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
158#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
159
160/* worker threads management */
161
162static void
163etp_worker_clear (etp_worker *wrk)
164{
165}
166
167static void ecb_cold
168etp_worker_free (etp_worker *wrk)
169{
170 free (wrk->tmpbuf.ptr);
171
172 wrk->next->prev = wrk->prev;
173 wrk->prev->next = wrk->next;
174
175 free (wrk);
176}
177
178ETP_API_DECL unsigned int
179etp_nreqs (etp_pool pool)
180{
181 int retval;
182 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
183 retval = pool->nreqs;
184 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
185 return retval;
186}
187
188ETP_API_DECL unsigned int
189etp_nready (etp_pool pool)
190{
191 unsigned int retval;
192
193 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
194 retval = pool->nready;
195 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
196
197 return retval;
198}
199
200ETP_API_DECL unsigned int
201etp_npending (etp_pool pool)
202{
203 unsigned int retval;
204
205 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
206 retval = pool->npending;
207 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
208
209 return retval;
210}
211
212ETP_API_DECL unsigned int
213etp_nthreads (etp_pool pool)
214{
215 unsigned int retval;
216
217 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
218 retval = pool->started;
219 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
220
221 return retval;
222}
206 223
207static void ecb_noinline ecb_cold 224static void ecb_noinline ecb_cold
208reqq_init (etp_reqq *q) 225reqq_init (etp_reqq *q)
209{ 226{
210 int pri; 227 int pri;
257 274
258 abort (); 275 abort ();
259} 276}
260 277
261ETP_API_DECL int ecb_cold 278ETP_API_DECL int ecb_cold
262etp_init (void (*want_poll)(void), void (*done_poll)(void)) 279etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
263{ 280{
264 X_MUTEX_CREATE (wrklock); 281 X_MUTEX_CREATE (pool->wrklock);
265 X_MUTEX_CREATE (reslock); 282 X_MUTEX_CREATE (pool->reslock);
266 X_MUTEX_CREATE (reqlock); 283 X_MUTEX_CREATE (pool->reqlock);
267 X_COND_CREATE (reqwait); 284 X_COND_CREATE (pool->reqwait);
268 285
269 reqq_init (&req_queue); 286 reqq_init (&pool->req_queue);
270 reqq_init (&res_queue); 287 reqq_init (&pool->res_queue);
271 288
272 wrk_first.next = 289 pool->wrk_first.next =
273 wrk_first.prev = &wrk_first; 290 pool->wrk_first.prev = &pool->wrk_first;
274 291
275 started = 0; 292 pool->started = 0;
276 idle = 0; 293 pool->idle = 0;
277 nreqs = 0; 294 pool->nreqs = 0;
278 nready = 0; 295 pool->nready = 0;
279 npending = 0; 296 pool->npending = 0;
297 pool->wanted = 4;
280 298
299 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
300 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
301
302 pool->userdata = userdata;
281 want_poll_cb = want_poll; 303 pool->want_poll_cb = want_poll;
282 done_poll_cb = done_poll; 304 pool->done_poll_cb = done_poll;
283 305
284 return 0; 306 return 0;
285} 307}
286 308
287static void ecb_noinline ecb_cold 309static void ecb_noinline ecb_cold
304X_THREAD_PROC (etp_proc) 326X_THREAD_PROC (etp_proc)
305{ 327{
306 ETP_REQ *req; 328 ETP_REQ *req;
307 struct timespec ts; 329 struct timespec ts;
308 etp_worker *self = (etp_worker *)thr_arg; 330 etp_worker *self = (etp_worker *)thr_arg;
331 etp_pool pool = self->pool;
309 332
310 etp_proc_init (); 333 etp_proc_init ();
311 334
312 /* try to distribute timeouts somewhat evenly */ 335 /* try to distribute timeouts somewhat evenly */
313 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 336 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
314 337
315 for (;;) 338 for (;;)
316 { 339 {
317 ts.tv_sec = 0; 340 ts.tv_sec = 0;
318 341
319 X_LOCK (reqlock); 342 X_LOCK (pool->reqlock);
320 343
321 for (;;) 344 for (;;)
322 { 345 {
323 req = reqq_shift (&req_queue); 346 req = reqq_shift (&pool->req_queue);
324 347
325 if (req) 348 if (ecb_expect_true (req))
326 break; 349 break;
327 350
328 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 351 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
329 { 352 {
330 X_UNLOCK (reqlock); 353 X_UNLOCK (pool->reqlock);
331 X_LOCK (wrklock); 354 X_LOCK (pool->wrklock);
332 --started; 355 --pool->started;
333 X_UNLOCK (wrklock); 356 X_UNLOCK (pool->wrklock);
334 goto quit; 357 goto quit;
335 } 358 }
336 359
337 ++idle; 360 ++pool->idle;
338 361
339 if (idle <= max_idle) 362 if (pool->idle <= pool->max_idle)
340 /* we are allowed to idle, so do so without any timeout */ 363 /* we are allowed to pool->idle, so do so without any timeout */
341 X_COND_WAIT (reqwait, reqlock); 364 X_COND_WAIT (pool->reqwait, pool->reqlock);
342 else 365 else
343 { 366 {
344 /* initialise timeout once */ 367 /* initialise timeout once */
345 if (!ts.tv_sec) 368 if (!ts.tv_sec)
346 ts.tv_sec = time (0) + idle_timeout; 369 ts.tv_sec = time (0) + pool->idle_timeout;
347 370
348 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) 371 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
349 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ 372 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
350 } 373 }
351 374
352 --idle; 375 --pool->idle;
353 } 376 }
354 377
355 --nready; 378 --pool->nready;
356 379
357 X_UNLOCK (reqlock); 380 X_UNLOCK (pool->reqlock);
358 381
359 if (req->type == ETP_TYPE_QUIT) 382 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
360 goto quit; 383 goto quit;
361 384
362 ETP_EXECUTE (self, req); 385 ETP_EXECUTE (self, req);
363 386
364 X_LOCK (reslock); 387 X_LOCK (pool->reslock);
365 388
366 ++npending; 389 ++pool->npending;
367 390
368 if (!reqq_push (&res_queue, req) && want_poll_cb) 391 if (!reqq_push (&pool->res_queue, req))
369 want_poll_cb (); 392 ETP_WANT_POLL (pool);
370 393
371 etp_worker_clear (self); 394 etp_worker_clear (self);
372 395
373 X_UNLOCK (reslock); 396 X_UNLOCK (pool->reslock);
374 } 397 }
375 398
376quit: 399quit:
377 free (req); 400 free (req);
378 401
379 X_LOCK (wrklock); 402 X_LOCK (pool->wrklock);
380 etp_worker_free (self); 403 etp_worker_free (self);
381 X_UNLOCK (wrklock); 404 X_UNLOCK (pool->wrklock);
382 405
383 return 0; 406 return 0;
384} 407}
385 408
386static void ecb_cold 409static void ecb_cold
387etp_start_thread (void) 410etp_start_thread (etp_pool pool)
388{ 411{
389 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 412 etp_worker *wrk = calloc (1, sizeof (etp_worker));
390 413
391 /*TODO*/ 414 /*TODO*/
392 assert (("unable to allocate worker thread data", wrk)); 415 assert (("unable to allocate worker thread data", wrk));
393 416
417 wrk->pool = pool;
418
394 X_LOCK (wrklock); 419 X_LOCK (pool->wrklock);
395 420
396 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 421 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
397 { 422 {
398 wrk->prev = &wrk_first; 423 wrk->prev = &pool->wrk_first;
399 wrk->next = wrk_first.next; 424 wrk->next = pool->wrk_first.next;
400 wrk_first.next->prev = wrk; 425 pool->wrk_first.next->prev = wrk;
401 wrk_first.next = wrk; 426 pool->wrk_first.next = wrk;
402 ++started; 427 ++pool->started;
403 } 428 }
404 else 429 else
405 free (wrk); 430 free (wrk);
406 431
407 X_UNLOCK (wrklock); 432 X_UNLOCK (pool->wrklock);
408} 433}
409 434
410static void 435static void
411etp_maybe_start_thread (void) 436etp_maybe_start_thread (etp_pool pool)
412{ 437{
413 if (ecb_expect_true (etp_nthreads () >= wanted)) 438 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
414 return; 439 return;
415 440
416 /* todo: maybe use idle here, but might be less exact */ 441 /* todo: maybe use pool->idle here, but might be less exact */
417 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 442 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
418 return; 443 return;
419 444
420 etp_start_thread (); 445 etp_start_thread (pool);
421} 446}
422 447
423static void ecb_cold 448static void ecb_cold
424etp_end_thread (void) 449etp_end_thread (etp_pool pool)
425{ 450{
426 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 451 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
427 452
428 req->type = ETP_TYPE_QUIT; 453 req->type = ETP_TYPE_QUIT;
429 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 454 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
430 455
431 X_LOCK (reqlock); 456 X_LOCK (pool->reqlock);
432 reqq_push (&req_queue, req); 457 reqq_push (&pool->req_queue, req);
433 X_COND_SIGNAL (reqwait); 458 X_COND_SIGNAL (pool->reqwait);
434 X_UNLOCK (reqlock); 459 X_UNLOCK (pool->reqlock);
435 460
436 X_LOCK (wrklock); 461 X_LOCK (pool->wrklock);
437 --started; 462 --pool->started;
438 X_UNLOCK (wrklock); 463 X_UNLOCK (pool->wrklock);
439} 464}
440 465
441ETP_API_DECL int 466ETP_API_DECL int
442etp_poll (void) 467etp_poll (etp_pool pool)
443{ 468{
444 unsigned int maxreqs; 469 unsigned int maxreqs;
445 unsigned int maxtime; 470 unsigned int maxtime;
446 struct timeval tv_start, tv_now; 471 struct timeval tv_start, tv_now;
447 472
448 X_LOCK (reslock); 473 X_LOCK (pool->reslock);
449 maxreqs = max_poll_reqs; 474 maxreqs = pool->max_poll_reqs;
450 maxtime = max_poll_time; 475 maxtime = pool->max_poll_time;
451 X_UNLOCK (reslock); 476 X_UNLOCK (pool->reslock);
452 477
453 if (maxtime) 478 if (maxtime)
454 gettimeofday (&tv_start, 0); 479 gettimeofday (&tv_start, 0);
455 480
456 for (;;) 481 for (;;)
457 { 482 {
458 ETP_REQ *req; 483 ETP_REQ *req;
459 484
460 etp_maybe_start_thread (); 485 etp_maybe_start_thread (pool);
461 486
462 X_LOCK (reslock); 487 X_LOCK (pool->reslock);
463 req = reqq_shift (&res_queue); 488 req = reqq_shift (&pool->res_queue);
464 489
465 if (req) 490 if (ecb_expect_true (req))
466 { 491 {
467 --npending; 492 --pool->npending;
468 493
469 if (!res_queue.size && done_poll_cb) 494 if (!pool->res_queue.size)
470 done_poll_cb (); 495 ETP_DONE_POLL (pool);
471 } 496 }
472 497
473 X_UNLOCK (reslock); 498 X_UNLOCK (pool->reslock);
474 499
475 if (!req) 500 if (ecb_expect_false (!req))
476 return 0; 501 return 0;
477 502
478 X_LOCK (reqlock); 503 X_LOCK (pool->reqlock);
479 --nreqs; 504 --pool->nreqs;
480 X_UNLOCK (reqlock); 505 X_UNLOCK (pool->reqlock);
481 506
482 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 507 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
483 { 508 {
484 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ 509 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
485 continue; 510 continue;
506 errno = EAGAIN; 531 errno = EAGAIN;
507 return -1; 532 return -1;
508} 533}
509 534
510ETP_API_DECL void 535ETP_API_DECL void
511etp_grp_cancel (ETP_REQ *grp); 536etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
512 537
513ETP_API_DECL void 538ETP_API_DECL void
514etp_cancel (ETP_REQ *req) 539etp_cancel (etp_pool pool, ETP_REQ *req)
515{ 540{
516 req->cancelled = 1; 541 req->cancelled = 1;
517 542
518 etp_grp_cancel (req); 543 etp_grp_cancel (pool, req);
519} 544}
520 545
521ETP_API_DECL void 546ETP_API_DECL void
522etp_grp_cancel (ETP_REQ *grp) 547etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
523{ 548{
524 for (grp = grp->grp_first; grp; grp = grp->grp_next) 549 for (grp = grp->grp_first; grp; grp = grp->grp_next)
525 etp_cancel (grp); 550 etp_cancel (pool, grp);
526} 551}
527 552
528ETP_API_DECL void 553ETP_API_DECL void
529etp_submit (ETP_REQ *req) 554etp_submit (etp_pool pool, ETP_REQ *req)
530{ 555{
531 req->pri -= ETP_PRI_MIN; 556 req->pri -= ETP_PRI_MIN;
532 557
533 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 558 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
534 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 559 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
535 560
536 if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) 561 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
537 { 562 {
538 /* I hope this is worth it :/ */ 563 /* I hope this is worth it :/ */
539 X_LOCK (reqlock); 564 X_LOCK (pool->reqlock);
540 ++nreqs; 565 ++pool->nreqs;
541 X_UNLOCK (reqlock); 566 X_UNLOCK (pool->reqlock);
542 567
543 X_LOCK (reslock); 568 X_LOCK (pool->reslock);
544 569
545 ++npending; 570 ++pool->npending;
546 571
547 if (!reqq_push (&res_queue, req) && want_poll_cb) 572 if (!reqq_push (&pool->res_queue, req))
548 want_poll_cb (); 573 ETP_WANT_POLL (pool);
549 574
550 X_UNLOCK (reslock); 575 X_UNLOCK (pool->reslock);
551 } 576 }
552 else 577 else
553 { 578 {
554 X_LOCK (reqlock); 579 X_LOCK (pool->reqlock);
555 ++nreqs; 580 ++pool->nreqs;
556 ++nready; 581 ++pool->nready;
557 reqq_push (&req_queue, req); 582 reqq_push (&pool->req_queue, req);
558 X_COND_SIGNAL (reqwait); 583 X_COND_SIGNAL (pool->reqwait);
559 X_UNLOCK (reqlock); 584 X_UNLOCK (pool->reqlock);
560 585
561 etp_maybe_start_thread (); 586 etp_maybe_start_thread (pool);
562 } 587 }
563} 588}
564 589
565ETP_API_DECL void ecb_cold 590ETP_API_DECL void ecb_cold
566etp_set_max_poll_time (double nseconds) 591etp_set_max_poll_time (etp_pool pool, double seconds)
567{ 592{
568 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 593 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
569 max_poll_time = nseconds * ETP_TICKS; 594 pool->max_poll_time = seconds * ETP_TICKS;
570 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 595 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
571} 596}
572 597
573ETP_API_DECL void ecb_cold 598ETP_API_DECL void ecb_cold
574etp_set_max_poll_reqs (unsigned int maxreqs) 599etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
575{ 600{
576 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 601 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
577 max_poll_reqs = maxreqs; 602 pool->max_poll_reqs = maxreqs;
578 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 603 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
579} 604}
580 605
581ETP_API_DECL void ecb_cold 606ETP_API_DECL void ecb_cold
582etp_set_max_idle (unsigned int nthreads) 607etp_set_max_idle (etp_pool pool, unsigned int threads)
583{ 608{
584 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 609 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
585 max_idle = nthreads; 610 pool->max_idle = threads;
586 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 611 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
587} 612}
588 613
589ETP_API_DECL void ecb_cold 614ETP_API_DECL void ecb_cold
590etp_set_idle_timeout (unsigned int seconds) 615etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
591{ 616{
592 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 617 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
593 idle_timeout = seconds; 618 pool->idle_timeout = seconds;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 619 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
595} 620}
596 621
597ETP_API_DECL void ecb_cold 622ETP_API_DECL void ecb_cold
598etp_set_min_parallel (unsigned int nthreads) 623etp_set_min_parallel (etp_pool pool, unsigned int threads)
599{ 624{
600 if (wanted < nthreads) 625 if (pool->wanted < threads)
601 wanted = nthreads; 626 pool->wanted = threads;
602} 627}
603 628
604ETP_API_DECL void ecb_cold 629ETP_API_DECL void ecb_cold
605etp_set_max_parallel (unsigned int nthreads) 630etp_set_max_parallel (etp_pool pool, unsigned int threads)
606{ 631{
607 if (wanted > nthreads) 632 if (pool->wanted > threads)
608 wanted = nthreads; 633 pool->wanted = threads;
609 634
610 while (started > wanted) 635 while (pool->started > pool->wanted)
611 etp_end_thread (); 636 etp_end_thread (pool);
612} 637}
613 638

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines