ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
(Generate patch)

Comparing libeio/etp.c (file contents):
Revision 1.4 by root, Thu Jun 25 17:05:07 2015 UTC vs.
Revision 1.11 by root, Sun May 1 17:15:45 2016 UTC

1/* 1/*
2 * libetp implementation 2 * libetp implementation
3 * 3 *
4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de> 4 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 * All rights reserved. 5 * All rights reserved.
6 * 6 *
7 * Redistribution and use in source and binary forms, with or without modifica- 7 * Redistribution and use in source and binary forms, with or without modifica-
8 * tion, are permitted provided that the following conditions are met: 8 * tion, are permitted provided that the following conditions are met:
9 * 9 *
35 * and other provisions required by the GPL. If you do not delete the 35 * and other provisions required by the GPL. If you do not delete the
36 * provisions above, a recipient may use your version of this file under 36 * provisions above, a recipient may use your version of this file under
37 * either the BSD or the GPL. 37 * either the BSD or the GPL.
38 */ 38 */
39 39
40#if HAVE_SYS_PRCTL_H
41# include <sys/prctl.h>
42#endif
43
44#ifdef EIO_STACKSIZE
45# define X_STACKSIZE EIO_STACKSIZE
46#endif
47#include "xthread.h"
48
40#ifndef ETP_API_DECL 49#ifndef ETP_API_DECL
41# define ETP_API_DECL static 50# define ETP_API_DECL static
42#endif 51#endif
43 52
44#ifndef ETP_PRI_MIN 53#ifndef ETP_PRI_MIN
50# define ETP_TYPE_QUIT 0 59# define ETP_TYPE_QUIT 0
51#endif 60#endif
52 61
53#ifndef ETP_TYPE_GROUP 62#ifndef ETP_TYPE_GROUP
54# define ETP_TYPE_GROUP 1 63# define ETP_TYPE_GROUP 1
64#endif
65
66#ifndef ETP_WANT_POLL
67# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
68#endif
69#ifndef ETP_DONE_POLL
70# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
55#endif 71#endif
56 72
57#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) 73#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58 74
59#define ETP_TICKS ((1000000 + 1023) >> 10) 75#define ETP_TICKS ((1000000 + 1023) >> 10)
69{ 85{
70 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS 86 return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
71 + ((tv2->tv_usec - tv1->tv_usec) >> 10); 87 + ((tv2->tv_usec - tv1->tv_usec) >> 10);
72} 88}
73 89
74static unsigned int started, idle, wanted = 4;
75
76static void (*want_poll_cb) (void);
77static void (*done_poll_cb) (void);
78
79static unsigned int max_poll_time; /* reslock */
80static unsigned int max_poll_reqs; /* reslock */
81
82static unsigned int nreqs; /* reqlock */
83static unsigned int nready; /* reqlock */
84static unsigned int npending; /* reqlock */
85static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
86static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
87
88static xmutex_t wrklock;
89static xmutex_t reslock;
90static xmutex_t reqlock;
91static xcond_t reqwait;
92
93struct etp_tmpbuf 90struct etp_tmpbuf
94{ 91{
95 void *ptr; 92 void *ptr;
96 int len; 93 int len;
97}; 94};
104 free (buf->ptr); 101 free (buf->ptr);
105 buf->ptr = malloc (buf->len = len); 102 buf->ptr = malloc (buf->len = len);
106 } 103 }
107 104
108 return buf->ptr; 105 return buf->ptr;
109}
110
111typedef struct etp_worker
112{
113 struct etp_tmpbuf tmpbuf;
114
115 /* locked by wrklock */
116 struct etp_worker *prev, *next;
117
118 xthread_t tid;
119
120#ifdef ETP_WORKER_COMMON
121 ETP_WORKER_COMMON
122#endif
123} etp_worker;
124
125static etp_worker wrk_first; /* NOT etp */
126
127#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
128#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
129
130/* worker threads management */
131
132static void
133etp_worker_clear (etp_worker *wrk)
134{
135}
136
137static void ecb_cold
138etp_worker_free (etp_worker *wrk)
139{
140 free (wrk->tmpbuf.ptr);
141
142 wrk->next->prev = wrk->prev;
143 wrk->prev->next = wrk->next;
144
145 free (wrk);
146}
147
148ETP_API_DECL unsigned int
149etp_nreqs (void)
150{
151 int retval;
152 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
153 retval = nreqs;
154 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
155 return retval;
156}
157
158ETP_API_DECL unsigned int
159etp_nready (void)
160{
161 unsigned int retval;
162
163 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
164 retval = nready;
165 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
166
167 return retval;
168}
169
170ETP_API_DECL unsigned int
171etp_npending (void)
172{
173 unsigned int retval;
174
175 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
176 retval = npending;
177 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
178
179 return retval;
180}
181
182ETP_API_DECL unsigned int
183etp_nthreads (void)
184{
185 unsigned int retval;
186
187 if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
188 retval = started;
189 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
190
191 return retval;
192} 106}
193 107
194/* 108/*
195 * a somewhat faster data structure might be nice, but 109 * a somewhat faster data structure might be nice, but
196 * with 8 priorities this actually needs <20 insns 110 * with 8 priorities this actually needs <20 insns
197 * per shift, the most expensive operation. 111 * per shift, the most expensive operation.
198 */ 112 */
199typedef struct { 113typedef struct
114{
200 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ 115 ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
201 int size; 116 int size;
202} etp_reqq; 117} etp_reqq;
203 118
119typedef struct etp_pool *etp_pool;
120
121typedef struct etp_worker
122{
123 etp_pool pool;
124
125 struct etp_tmpbuf tmpbuf;
126
127 /* locked by pool->wrklock */
128 struct etp_worker *prev, *next;
129
130 xthread_t tid;
131
132#ifdef ETP_WORKER_COMMON
133 ETP_WORKER_COMMON
134#endif
135} etp_worker;
136
137struct etp_pool
138{
139 void *userdata;
140
204static etp_reqq req_queue; 141 etp_reqq req_queue;
205static etp_reqq res_queue; 142 etp_reqq res_queue;
143
144 unsigned int started, idle, wanted;
145
146 unsigned int max_poll_time; /* pool->reslock */
147 unsigned int max_poll_reqs; /* pool->reslock */
148
149 unsigned int nreqs; /* pool->reqlock */
150 unsigned int nready; /* pool->reqlock */
151 unsigned int npending; /* pool->reqlock */
152 unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
153 unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
154
155 void (*want_poll_cb) (void *userdata);
156 void (*done_poll_cb) (void *userdata);
157
158 xmutex_t wrklock;
159 xmutex_t reslock;
160 xmutex_t reqlock;
161 xcond_t reqwait;
162
163 etp_worker wrk_first;
164};
165
166#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
167#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
168
169/* worker threads management */
170
171static void
172etp_worker_clear (etp_worker *wrk)
173{
174}
175
176static void ecb_cold
177etp_worker_free (etp_worker *wrk)
178{
179 free (wrk->tmpbuf.ptr);
180
181 wrk->next->prev = wrk->prev;
182 wrk->prev->next = wrk->next;
183
184 free (wrk);
185}
186
187ETP_API_DECL unsigned int
188etp_nreqs (etp_pool pool)
189{
190 int retval;
191 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
192 retval = pool->nreqs;
193 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
194 return retval;
195}
196
197ETP_API_DECL unsigned int
198etp_nready (etp_pool pool)
199{
200 unsigned int retval;
201
202 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
203 retval = pool->nready;
204 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
205
206 return retval;
207}
208
209ETP_API_DECL unsigned int
210etp_npending (etp_pool pool)
211{
212 unsigned int retval;
213
214 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
215 retval = pool->npending;
216 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
217
218 return retval;
219}
220
221ETP_API_DECL unsigned int
222etp_nthreads (etp_pool pool)
223{
224 unsigned int retval;
225
226 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
227 retval = pool->started;
228 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
229
230 return retval;
231}
206 232
207static void ecb_noinline ecb_cold 233static void ecb_noinline ecb_cold
208reqq_init (etp_reqq *q) 234reqq_init (etp_reqq *q)
209{ 235{
210 int pri; 236 int pri;
257 283
258 abort (); 284 abort ();
259} 285}
260 286
261ETP_API_DECL int ecb_cold 287ETP_API_DECL int ecb_cold
262etp_init (void (*want_poll)(void), void (*done_poll)(void)) 288etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
263{ 289{
264 X_MUTEX_CREATE (wrklock); 290 X_MUTEX_CREATE (pool->wrklock);
265 X_MUTEX_CREATE (reslock); 291 X_MUTEX_CREATE (pool->reslock);
266 X_MUTEX_CREATE (reqlock); 292 X_MUTEX_CREATE (pool->reqlock);
267 X_COND_CREATE (reqwait); 293 X_COND_CREATE (pool->reqwait);
268 294
269 reqq_init (&req_queue); 295 reqq_init (&pool->req_queue);
270 reqq_init (&res_queue); 296 reqq_init (&pool->res_queue);
271 297
272 wrk_first.next = 298 pool->wrk_first.next =
273 wrk_first.prev = &wrk_first; 299 pool->wrk_first.prev = &pool->wrk_first;
274 300
275 started = 0; 301 pool->started = 0;
276 idle = 0; 302 pool->idle = 0;
277 nreqs = 0; 303 pool->nreqs = 0;
278 nready = 0; 304 pool->nready = 0;
279 npending = 0; 305 pool->npending = 0;
306 pool->wanted = 4;
280 307
308 pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
309 pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
310
311 pool->userdata = userdata;
281 want_poll_cb = want_poll; 312 pool->want_poll_cb = want_poll;
282 done_poll_cb = done_poll; 313 pool->done_poll_cb = done_poll;
283 314
284 return 0; 315 return 0;
285} 316}
286 317
287static void ecb_noinline ecb_cold 318static void ecb_noinline ecb_cold
288etp_proc_init (void) 319etp_proc_init (void)
289{ 320{
290#if HAVE_PRCTL_SET_NAME 321#if HAVE_PRCTL_SET_NAME
291 /* provide a more sensible "thread name" */ 322 /* provide a more sensible "thread name" */
292 char name[16 + 1]; 323 char name[15 + 1];
293 const int namelen = sizeof (name) - 1; 324 const int namelen = sizeof (name) - 1;
294 int len; 325 int len;
295 326
296 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); 327 prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
297 name [namelen] = 0; 328 name [namelen] = 0;
304X_THREAD_PROC (etp_proc) 335X_THREAD_PROC (etp_proc)
305{ 336{
306 ETP_REQ *req; 337 ETP_REQ *req;
307 struct timespec ts; 338 struct timespec ts;
308 etp_worker *self = (etp_worker *)thr_arg; 339 etp_worker *self = (etp_worker *)thr_arg;
340 etp_pool pool = self->pool;
309 341
310 etp_proc_init (); 342 etp_proc_init ();
311 343
312 /* try to distribute timeouts somewhat evenly */ 344 /* try to distribute timeouts somewhat evenly */
313 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); 345 ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
314 346
315 for (;;) 347 for (;;)
316 { 348 {
317 ts.tv_sec = 0; 349 ts.tv_sec = 0;
318 350
319 X_LOCK (reqlock); 351 X_LOCK (pool->reqlock);
320 352
321 for (;;) 353 for (;;)
322 { 354 {
323 req = reqq_shift (&req_queue); 355 req = reqq_shift (&pool->req_queue);
324 356
325 if (req) 357 if (ecb_expect_true (req))
326 break; 358 break;
327 359
328 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ 360 if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
329 { 361 {
330 X_UNLOCK (reqlock); 362 X_UNLOCK (pool->reqlock);
331 X_LOCK (wrklock); 363 X_LOCK (pool->wrklock);
332 --started; 364 --pool->started;
333 X_UNLOCK (wrklock); 365 X_UNLOCK (pool->wrklock);
334 goto quit; 366 goto quit;
335 } 367 }
336 368
337 ++idle; 369 ++pool->idle;
338 370
339 if (idle <= max_idle) 371 if (pool->idle <= pool->max_idle)
340 /* we are allowed to idle, so do so without any timeout */ 372 /* we are allowed to pool->idle, so do so without any timeout */
341 X_COND_WAIT (reqwait, reqlock); 373 X_COND_WAIT (pool->reqwait, pool->reqlock);
342 else 374 else
343 { 375 {
344 /* initialise timeout once */ 376 /* initialise timeout once */
345 if (!ts.tv_sec) 377 if (!ts.tv_sec)
346 ts.tv_sec = time (0) + idle_timeout; 378 ts.tv_sec = time (0) + pool->idle_timeout;
347 379
348 if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) 380 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
349 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ 381 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
350 } 382 }
351 383
352 --idle; 384 --pool->idle;
353 } 385 }
354 386
355 --nready; 387 --pool->nready;
356 388
357 X_UNLOCK (reqlock); 389 X_UNLOCK (pool->reqlock);
358 390
359 if (req->type == ETP_TYPE_QUIT) 391 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
360 goto quit; 392 goto quit;
361 393
362 ETP_EXECUTE (self, req); 394 ETP_EXECUTE (self, req);
363 395
364 X_LOCK (reslock); 396 X_LOCK (pool->reslock);
365 397
366 ++npending; 398 ++pool->npending;
367 399
368 if (!reqq_push (&res_queue, req) && want_poll_cb) 400 if (!reqq_push (&pool->res_queue, req))
369 want_poll_cb (); 401 ETP_WANT_POLL (pool);
370 402
371 etp_worker_clear (self); 403 etp_worker_clear (self);
372 404
373 X_UNLOCK (reslock); 405 X_UNLOCK (pool->reslock);
374 } 406 }
375 407
376quit: 408quit:
377 free (req); 409 free (req);
378 410
379 X_LOCK (wrklock); 411 X_LOCK (pool->wrklock);
380 etp_worker_free (self); 412 etp_worker_free (self);
381 X_UNLOCK (wrklock); 413 X_UNLOCK (pool->wrklock);
382 414
383 return 0; 415 return 0;
384} 416}
385 417
386static void ecb_cold 418static void ecb_cold
387etp_start_thread (void) 419etp_start_thread (etp_pool pool)
388{ 420{
389 etp_worker *wrk = calloc (1, sizeof (etp_worker)); 421 etp_worker *wrk = calloc (1, sizeof (etp_worker));
390 422
391 /*TODO*/ 423 /*TODO*/
392 assert (("unable to allocate worker thread data", wrk)); 424 assert (("unable to allocate worker thread data", wrk));
393 425
426 wrk->pool = pool;
427
394 X_LOCK (wrklock); 428 X_LOCK (pool->wrklock);
395 429
396 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) 430 if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
397 { 431 {
398 wrk->prev = &wrk_first; 432 wrk->prev = &pool->wrk_first;
399 wrk->next = wrk_first.next; 433 wrk->next = pool->wrk_first.next;
400 wrk_first.next->prev = wrk; 434 pool->wrk_first.next->prev = wrk;
401 wrk_first.next = wrk; 435 pool->wrk_first.next = wrk;
402 ++started; 436 ++pool->started;
403 } 437 }
404 else 438 else
405 free (wrk); 439 free (wrk);
406 440
407 X_UNLOCK (wrklock); 441 X_UNLOCK (pool->wrklock);
408} 442}
409 443
410static void 444static void
411etp_maybe_start_thread (void) 445etp_maybe_start_thread (etp_pool pool)
412{ 446{
413 if (ecb_expect_true (etp_nthreads () >= wanted)) 447 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
414 return; 448 return;
415 449
416 /* todo: maybe use idle here, but might be less exact */ 450 /* todo: maybe use pool->idle here, but might be less exact */
417 if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) 451 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
418 return; 452 return;
419 453
420 etp_start_thread (); 454 etp_start_thread (pool);
421} 455}
422 456
423static void ecb_cold 457static void ecb_cold
424etp_end_thread (void) 458etp_end_thread (etp_pool pool)
425{ 459{
426 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ 460 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
427 461
428 req->type = ETP_TYPE_QUIT; 462 req->type = ETP_TYPE_QUIT;
429 req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 463 req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
430 464
431 X_LOCK (reqlock); 465 X_LOCK (pool->reqlock);
432 reqq_push (&req_queue, req); 466 reqq_push (&pool->req_queue, req);
433 X_COND_SIGNAL (reqwait); 467 X_COND_SIGNAL (pool->reqwait);
434 X_UNLOCK (reqlock); 468 X_UNLOCK (pool->reqlock);
435 469
436 X_LOCK (wrklock); 470 X_LOCK (pool->wrklock);
437 --started; 471 --pool->started;
438 X_UNLOCK (wrklock); 472 X_UNLOCK (pool->wrklock);
439} 473}
440 474
441ETP_API_DECL int 475ETP_API_DECL int
442etp_poll (void) 476etp_poll (etp_pool pool)
443{ 477{
444 unsigned int maxreqs; 478 unsigned int maxreqs;
445 unsigned int maxtime; 479 unsigned int maxtime;
446 struct timeval tv_start, tv_now; 480 struct timeval tv_start, tv_now;
447 481
448 X_LOCK (reslock); 482 X_LOCK (pool->reslock);
449 maxreqs = max_poll_reqs; 483 maxreqs = pool->max_poll_reqs;
450 maxtime = max_poll_time; 484 maxtime = pool->max_poll_time;
451 X_UNLOCK (reslock); 485 X_UNLOCK (pool->reslock);
452 486
453 if (maxtime) 487 if (maxtime)
454 gettimeofday (&tv_start, 0); 488 gettimeofday (&tv_start, 0);
455 489
456 for (;;) 490 for (;;)
457 { 491 {
458 ETP_REQ *req; 492 ETP_REQ *req;
459 493
460 etp_maybe_start_thread (); 494 etp_maybe_start_thread (pool);
461 495
462 X_LOCK (reslock); 496 X_LOCK (pool->reslock);
463 req = reqq_shift (&res_queue); 497 req = reqq_shift (&pool->res_queue);
464 498
465 if (req) 499 if (ecb_expect_true (req))
466 { 500 {
467 --npending; 501 --pool->npending;
468 502
469 if (!res_queue.size && done_poll_cb) 503 if (!pool->res_queue.size)
470 done_poll_cb (); 504 ETP_DONE_POLL (pool);
471 } 505 }
472 506
473 X_UNLOCK (reslock); 507 X_UNLOCK (pool->reslock);
474 508
475 if (!req) 509 if (ecb_expect_false (!req))
476 return 0; 510 return 0;
477 511
478 X_LOCK (reqlock); 512 X_LOCK (pool->reqlock);
479 --nreqs; 513 --pool->nreqs;
480 X_UNLOCK (reqlock); 514 X_UNLOCK (pool->reqlock);
481 515
482 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) 516 if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
483 { 517 {
484 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ 518 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
485 continue; 519 continue;
506 errno = EAGAIN; 540 errno = EAGAIN;
507 return -1; 541 return -1;
508} 542}
509 543
510ETP_API_DECL void 544ETP_API_DECL void
511etp_grp_cancel (ETP_REQ *grp); 545etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
512 546
513ETP_API_DECL void 547ETP_API_DECL void
514etp_cancel (ETP_REQ *req) 548etp_cancel (etp_pool pool, ETP_REQ *req)
515{ 549{
516 req->cancelled = 1; 550 req->cancelled = 1;
517 551
518 etp_grp_cancel (req); 552 etp_grp_cancel (pool, req);
519} 553}
520 554
521ETP_API_DECL void 555ETP_API_DECL void
522etp_grp_cancel (ETP_REQ *grp) 556etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
523{ 557{
524 for (grp = grp->grp_first; grp; grp = grp->grp_next) 558 for (grp = grp->grp_first; grp; grp = grp->grp_next)
525 etp_cancel (grp); 559 etp_cancel (pool, grp);
526} 560}
527 561
528ETP_API_DECL void 562ETP_API_DECL void
529etp_submit (ETP_REQ *req) 563etp_submit (etp_pool pool, ETP_REQ *req)
530{ 564{
531 req->pri -= ETP_PRI_MIN; 565 req->pri -= ETP_PRI_MIN;
532 566
533 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; 567 if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
534 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; 568 if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
535 569
536 if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) 570 if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
537 { 571 {
538 /* I hope this is worth it :/ */ 572 /* I hope this is worth it :/ */
539 X_LOCK (reqlock); 573 X_LOCK (pool->reqlock);
540 ++nreqs; 574 ++pool->nreqs;
541 X_UNLOCK (reqlock); 575 X_UNLOCK (pool->reqlock);
542 576
543 X_LOCK (reslock); 577 X_LOCK (pool->reslock);
544 578
545 ++npending; 579 ++pool->npending;
546 580
547 if (!reqq_push (&res_queue, req) && want_poll_cb) 581 if (!reqq_push (&pool->res_queue, req))
548 want_poll_cb (); 582 ETP_WANT_POLL (pool);
549 583
550 X_UNLOCK (reslock); 584 X_UNLOCK (pool->reslock);
551 } 585 }
552 else 586 else
553 { 587 {
554 X_LOCK (reqlock); 588 X_LOCK (pool->reqlock);
555 ++nreqs; 589 ++pool->nreqs;
556 ++nready; 590 ++pool->nready;
557 reqq_push (&req_queue, req); 591 reqq_push (&pool->req_queue, req);
558 X_COND_SIGNAL (reqwait); 592 X_COND_SIGNAL (pool->reqwait);
559 X_UNLOCK (reqlock); 593 X_UNLOCK (pool->reqlock);
560 594
561 etp_maybe_start_thread (); 595 etp_maybe_start_thread (pool);
562 } 596 }
563} 597}
564 598
565ETP_API_DECL void ecb_cold 599ETP_API_DECL void ecb_cold
566etp_set_max_poll_time (double nseconds) 600etp_set_max_poll_time (etp_pool pool, double seconds)
567{ 601{
568 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 602 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
569 max_poll_time = nseconds * ETP_TICKS; 603 pool->max_poll_time = seconds * ETP_TICKS;
570 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 604 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
571} 605}
572 606
573ETP_API_DECL void ecb_cold 607ETP_API_DECL void ecb_cold
574etp_set_max_poll_reqs (unsigned int maxreqs) 608etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
575{ 609{
576 if (WORDACCESS_UNSAFE) X_LOCK (reslock); 610 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
577 max_poll_reqs = maxreqs; 611 pool->max_poll_reqs = maxreqs;
578 if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); 612 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
579} 613}
580 614
581ETP_API_DECL void ecb_cold 615ETP_API_DECL void ecb_cold
582etp_set_max_idle (unsigned int nthreads) 616etp_set_max_idle (etp_pool pool, unsigned int threads)
583{ 617{
584 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 618 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
585 max_idle = nthreads; 619 pool->max_idle = threads;
586 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 620 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
587} 621}
588 622
589ETP_API_DECL void ecb_cold 623ETP_API_DECL void ecb_cold
590etp_set_idle_timeout (unsigned int seconds) 624etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
591{ 625{
592 if (WORDACCESS_UNSAFE) X_LOCK (reqlock); 626 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
593 idle_timeout = seconds; 627 pool->idle_timeout = seconds;
594 if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); 628 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
595} 629}
596 630
597ETP_API_DECL void ecb_cold 631ETP_API_DECL void ecb_cold
598etp_set_min_parallel (unsigned int nthreads) 632etp_set_min_parallel (etp_pool pool, unsigned int threads)
599{ 633{
600 if (wanted < nthreads) 634 if (pool->wanted < threads)
601 wanted = nthreads; 635 pool->wanted = threads;
602} 636}
603 637
604ETP_API_DECL void ecb_cold 638ETP_API_DECL void ecb_cold
605etp_set_max_parallel (unsigned int nthreads) 639etp_set_max_parallel (etp_pool pool, unsigned int threads)
606{ 640{
607 if (wanted > nthreads) 641 if (pool->wanted > threads)
608 wanted = nthreads; 642 pool->wanted = threads;
609 643
610 while (started > wanted) 644 while (pool->started > pool->wanted)
611 etp_end_thread (); 645 etp_end_thread (pool);
612} 646}
613 647

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines