ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.14
Committed: Tue Aug 14 11:47:01 2018 UTC (5 years, 9 months ago) by root
Content type: text/plain
Branch: MAIN
CVS Tags: rel-4_6, rel-4_7, rel-4_81, rel-4_80, rel-4_78, rel-4_79, rel-4_54, rel-4_74, rel-4_75, rel-4_76, rel-4_77, rel-4_71, rel-4_72, rel-4_73, HEAD
Changes since 1.13: +5 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4 root 1.9 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5 root 1.1 * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40 root 1.13 #if HAVE_SYS_PRCTL_H
41     # include <sys/prctl.h>
42     #endif
43    
44 root 1.14 #ifdef EIO_STACKSIZE
45     # define X_STACKSIZE EIO_STACKSIZE
46     #endif
47     #include "xthread.h"
48    
49 root 1.1 #ifndef ETP_API_DECL
50     # define ETP_API_DECL static
51     #endif
52    
53     #ifndef ETP_PRI_MIN
54     # define ETP_PRI_MIN 0
55     # define ETP_PRI_MAX 0
56     #endif
57    
58     #ifndef ETP_TYPE_QUIT
59     # define ETP_TYPE_QUIT 0
60     #endif
61    
62     #ifndef ETP_TYPE_GROUP
63     # define ETP_TYPE_GROUP 1
64     #endif
65    
66 root 1.6 #ifndef ETP_WANT_POLL
67     # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata)
68     #endif
69     #ifndef ETP_DONE_POLL
70     # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata)
71     #endif
72    
73 root 1.1 #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
74    
75     #define ETP_TICKS ((1000000 + 1023) >> 10)
76    
77 root 1.4 enum {
78     ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
79     ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */
80     };
81    
82 root 1.1 /* calculate time difference in ~1/ETP_TICKS of a second */
83     ecb_inline int
84     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
85     {
86     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
87     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
88     }
89    
90 root 1.3 struct etp_tmpbuf
91     {
92     void *ptr;
93     int len;
94     };
95    
96     static void *
97     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
98     {
99     if (buf->len < len)
100     {
101     free (buf->ptr);
102     buf->ptr = malloc (buf->len = len);
103     }
104    
105     return buf->ptr;
106     }
107    
108 root 1.5 /*
109     * a somewhat faster data structure might be nice, but
110     * with 8 priorities this actually needs <20 insns
111     * per shift, the most expensive operation.
112     */
113     typedef struct
114     {
115     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
116     int size;
117     } etp_reqq;
118    
119 root 1.8 typedef struct etp_pool *etp_pool;
120    
121     typedef struct etp_worker
122     {
123     etp_pool pool;
124    
125     struct etp_tmpbuf tmpbuf;
126    
127     /* locked by pool->wrklock */
128     struct etp_worker *prev, *next;
129    
130     xthread_t tid;
131    
132     #ifdef ETP_WORKER_COMMON
133     ETP_WORKER_COMMON
134     #endif
135     } etp_worker;
136    
137 root 1.5 struct etp_pool
138     {
139 root 1.6 void *userdata;
140    
141 root 1.5 etp_reqq req_queue;
142     etp_reqq res_queue;
143 root 1.6
144     unsigned int started, idle, wanted;
145    
146     unsigned int max_poll_time; /* pool->reslock */
147     unsigned int max_poll_reqs; /* pool->reslock */
148    
149     unsigned int nreqs; /* pool->reqlock */
150     unsigned int nready; /* pool->reqlock */
151     unsigned int npending; /* pool->reqlock */
152     unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */
153     unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
154    
155     void (*want_poll_cb) (void *userdata);
156     void (*done_poll_cb) (void *userdata);
157    
158     xmutex_t wrklock;
159     xmutex_t reslock;
160     xmutex_t reqlock;
161     xcond_t reqwait;
162 root 1.7
163     etp_worker wrk_first;
164 root 1.5 };
165    
166 root 1.6 #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock)
167     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock)
168 root 1.1
169     /* worker threads management */
170    
171     static void
172     etp_worker_clear (etp_worker *wrk)
173     {
174     }
175    
176     static void ecb_cold
177     etp_worker_free (etp_worker *wrk)
178     {
179     free (wrk->tmpbuf.ptr);
180    
181     wrk->next->prev = wrk->prev;
182     wrk->prev->next = wrk->next;
183    
184     free (wrk);
185     }
186    
187     ETP_API_DECL unsigned int
188 root 1.5 etp_nreqs (etp_pool pool)
189 root 1.1 {
190     int retval;
191 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
192     retval = pool->nreqs;
193     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
194 root 1.1 return retval;
195     }
196    
197     ETP_API_DECL unsigned int
198 root 1.5 etp_nready (etp_pool pool)
199 root 1.1 {
200     unsigned int retval;
201    
202 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
203     retval = pool->nready;
204     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
205 root 1.1
206     return retval;
207     }
208    
209     ETP_API_DECL unsigned int
210 root 1.5 etp_npending (etp_pool pool)
211 root 1.1 {
212     unsigned int retval;
213    
214 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
215     retval = pool->npending;
216     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
217 root 1.1
218     return retval;
219     }
220    
221     ETP_API_DECL unsigned int
222 root 1.5 etp_nthreads (etp_pool pool)
223 root 1.1 {
224     unsigned int retval;
225    
226 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
227     retval = pool->started;
228     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
229 root 1.1
230     return retval;
231     }
232    
233     static void ecb_noinline ecb_cold
234     reqq_init (etp_reqq *q)
235     {
236     int pri;
237    
238     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
239     q->qs[pri] = q->qe[pri] = 0;
240    
241     q->size = 0;
242     }
243    
244     static int ecb_noinline
245     reqq_push (etp_reqq *q, ETP_REQ *req)
246     {
247     int pri = req->pri;
248     req->next = 0;
249    
250     if (q->qe[pri])
251     {
252     q->qe[pri]->next = req;
253     q->qe[pri] = req;
254     }
255     else
256     q->qe[pri] = q->qs[pri] = req;
257    
258     return q->size++;
259     }
260    
261     static ETP_REQ * ecb_noinline
262     reqq_shift (etp_reqq *q)
263     {
264     int pri;
265    
266     if (!q->size)
267     return 0;
268    
269     --q->size;
270    
271     for (pri = ETP_NUM_PRI; pri--; )
272     {
273 root 1.2 ETP_REQ *req = q->qs[pri];
274 root 1.1
275     if (req)
276     {
277 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
278 root 1.1 q->qe[pri] = 0;
279    
280     return req;
281     }
282     }
283    
284     abort ();
285     }
286    
287     ETP_API_DECL int ecb_cold
288 root 1.7 etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata))
289 root 1.1 {
290 root 1.6 X_MUTEX_CREATE (pool->wrklock);
291     X_MUTEX_CREATE (pool->reslock);
292     X_MUTEX_CREATE (pool->reqlock);
293     X_COND_CREATE (pool->reqwait);
294 root 1.1
295 root 1.5 reqq_init (&pool->req_queue);
296     reqq_init (&pool->res_queue);
297 root 1.1
298 root 1.7 pool->wrk_first.next =
299     pool->wrk_first.prev = &pool->wrk_first;
300 root 1.1
301 root 1.6 pool->started = 0;
302     pool->idle = 0;
303     pool->nreqs = 0;
304     pool->nready = 0;
305     pool->npending = 0;
306     pool->wanted = 4;
307    
308     pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */
309     pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
310 root 1.1
311 root 1.7 pool->userdata = userdata;
312 root 1.6 pool->want_poll_cb = want_poll;
313     pool->done_poll_cb = done_poll;
314 root 1.1
315     return 0;
316     }
317    
318 root 1.4 static void ecb_noinline ecb_cold
319     etp_proc_init (void)
320     {
321     #if HAVE_PRCTL_SET_NAME
322     /* provide a more sensible "thread name" */
323 root 1.12 char name[16 + 1];
324 root 1.4 const int namelen = sizeof (name) - 1;
325     int len;
326    
327     prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
328     name [namelen] = 0;
329     len = strlen (name);
330     strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
331     prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
332     #endif
333     }
334    
335     X_THREAD_PROC (etp_proc)
336     {
337     ETP_REQ *req;
338     struct timespec ts;
339     etp_worker *self = (etp_worker *)thr_arg;
340 root 1.5 etp_pool pool = self->pool;
341 root 1.4
342     etp_proc_init ();
343    
344     /* try to distribute timeouts somewhat evenly */
345     ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
346    
347     for (;;)
348     {
349     ts.tv_sec = 0;
350    
351 root 1.6 X_LOCK (pool->reqlock);
352 root 1.4
353     for (;;)
354     {
355 root 1.5 req = reqq_shift (&pool->req_queue);
356 root 1.4
357 root 1.9 if (ecb_expect_true (req))
358 root 1.4 break;
359    
360     if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */
361     {
362 root 1.6 X_UNLOCK (pool->reqlock);
363     X_LOCK (pool->wrklock);
364     --pool->started;
365     X_UNLOCK (pool->wrklock);
366 root 1.4 goto quit;
367     }
368    
369 root 1.6 ++pool->idle;
370 root 1.4
371 root 1.6 if (pool->idle <= pool->max_idle)
372     /* we are allowed to pool->idle, so do so without any timeout */
373     X_COND_WAIT (pool->reqwait, pool->reqlock);
374 root 1.4 else
375     {
376     /* initialise timeout once */
377     if (!ts.tv_sec)
378 root 1.6 ts.tv_sec = time (0) + pool->idle_timeout;
379 root 1.4
380 root 1.6 if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT)
381 root 1.4 ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */
382     }
383    
384 root 1.6 --pool->idle;
385 root 1.4 }
386    
387 root 1.6 --pool->nready;
388 root 1.4
389 root 1.6 X_UNLOCK (pool->reqlock);
390 root 1.4
391 root 1.9 if (ecb_expect_false (req->type == ETP_TYPE_QUIT))
392 root 1.4 goto quit;
393    
394     ETP_EXECUTE (self, req);
395    
396 root 1.6 X_LOCK (pool->reslock);
397 root 1.4
398 root 1.6 ++pool->npending;
399 root 1.4
400 root 1.6 if (!reqq_push (&pool->res_queue, req))
401 root 1.9 ETP_WANT_POLL (pool);
402 root 1.4
403     etp_worker_clear (self);
404    
405 root 1.6 X_UNLOCK (pool->reslock);
406 root 1.4 }
407    
408     quit:
409     free (req);
410    
411 root 1.6 X_LOCK (pool->wrklock);
412 root 1.4 etp_worker_free (self);
413 root 1.6 X_UNLOCK (pool->wrklock);
414 root 1.4
415     return 0;
416     }
417 root 1.1
418     static void ecb_cold
419 root 1.5 etp_start_thread (etp_pool pool)
420 root 1.1 {
421     etp_worker *wrk = calloc (1, sizeof (etp_worker));
422    
423     /*TODO*/
424     assert (("unable to allocate worker thread data", wrk));
425    
426 root 1.5 wrk->pool = pool;
427    
428 root 1.6 X_LOCK (pool->wrklock);
429 root 1.1
430     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
431     {
432 root 1.8 wrk->prev = &pool->wrk_first;
433 root 1.7 wrk->next = pool->wrk_first.next;
434     pool->wrk_first.next->prev = wrk;
435     pool->wrk_first.next = wrk;
436 root 1.6 ++pool->started;
437 root 1.1 }
438     else
439     free (wrk);
440    
441 root 1.6 X_UNLOCK (pool->wrklock);
442 root 1.1 }
443    
444     static void
445 root 1.5 etp_maybe_start_thread (etp_pool pool)
446 root 1.1 {
447 root 1.6 if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted))
448 root 1.1 return;
449    
450 root 1.6 /* todo: maybe use pool->idle here, but might be less exact */
451 root 1.5 if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool)))
452 root 1.1 return;
453    
454 root 1.5 etp_start_thread (pool);
455 root 1.1 }
456    
457     static void ecb_cold
458 root 1.5 etp_end_thread (etp_pool pool)
459 root 1.1 {
460 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
461 root 1.1
462     req->type = ETP_TYPE_QUIT;
463     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
464    
465 root 1.6 X_LOCK (pool->reqlock);
466 root 1.5 reqq_push (&pool->req_queue, req);
467 root 1.6 X_COND_SIGNAL (pool->reqwait);
468     X_UNLOCK (pool->reqlock);
469 root 1.1
470 root 1.6 X_LOCK (pool->wrklock);
471     --pool->started;
472     X_UNLOCK (pool->wrklock);
473 root 1.1 }
474    
475     ETP_API_DECL int
476 root 1.5 etp_poll (etp_pool pool)
477 root 1.1 {
478     unsigned int maxreqs;
479     unsigned int maxtime;
480     struct timeval tv_start, tv_now;
481    
482 root 1.6 X_LOCK (pool->reslock);
483     maxreqs = pool->max_poll_reqs;
484     maxtime = pool->max_poll_time;
485     X_UNLOCK (pool->reslock);
486 root 1.1
487     if (maxtime)
488     gettimeofday (&tv_start, 0);
489    
490     for (;;)
491     {
492     ETP_REQ *req;
493    
494 root 1.5 etp_maybe_start_thread (pool);
495 root 1.1
496 root 1.6 X_LOCK (pool->reslock);
497 root 1.5 req = reqq_shift (&pool->res_queue);
498 root 1.1
499 root 1.9 if (ecb_expect_true (req))
500 root 1.1 {
501 root 1.6 --pool->npending;
502 root 1.1
503 root 1.6 if (!pool->res_queue.size)
504 root 1.9 ETP_DONE_POLL (pool);
505 root 1.1 }
506    
507 root 1.6 X_UNLOCK (pool->reslock);
508 root 1.1
509 root 1.9 if (ecb_expect_false (!req))
510 root 1.1 return 0;
511    
512 root 1.6 X_LOCK (pool->reqlock);
513     --pool->nreqs;
514     X_UNLOCK (pool->reqlock);
515 root 1.1
516     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
517     {
518 root 1.4 req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
519 root 1.1 continue;
520     }
521     else
522     {
523     int res = ETP_FINISH (req);
524     if (ecb_expect_false (res))
525     return res;
526     }
527    
528     if (ecb_expect_false (maxreqs && !--maxreqs))
529     break;
530    
531     if (maxtime)
532     {
533     gettimeofday (&tv_now, 0);
534    
535     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
536     break;
537     }
538     }
539    
540     errno = EAGAIN;
541     return -1;
542     }
543    
544     ETP_API_DECL void
545 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp);
546 root 1.1
547     ETP_API_DECL void
548 root 1.5 etp_cancel (etp_pool pool, ETP_REQ *req)
549 root 1.1 {
550     req->cancelled = 1;
551    
552 root 1.5 etp_grp_cancel (pool, req);
553 root 1.1 }
554    
555     ETP_API_DECL void
556 root 1.5 etp_grp_cancel (etp_pool pool, ETP_REQ *grp)
557 root 1.1 {
558     for (grp = grp->grp_first; grp; grp = grp->grp_next)
559 root 1.5 etp_cancel (pool, grp);
560 root 1.1 }
561    
562     ETP_API_DECL void
563 root 1.5 etp_submit (etp_pool pool, ETP_REQ *req)
564 root 1.1 {
565     req->pri -= ETP_PRI_MIN;
566    
567     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
568     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
569    
570     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
571     {
572     /* I hope this is worth it :/ */
573 root 1.6 X_LOCK (pool->reqlock);
574     ++pool->nreqs;
575     X_UNLOCK (pool->reqlock);
576 root 1.1
577 root 1.6 X_LOCK (pool->reslock);
578 root 1.1
579 root 1.6 ++pool->npending;
580 root 1.1
581 root 1.6 if (!reqq_push (&pool->res_queue, req))
582     ETP_WANT_POLL (pool);
583 root 1.1
584 root 1.6 X_UNLOCK (pool->reslock);
585 root 1.1 }
586     else
587     {
588 root 1.6 X_LOCK (pool->reqlock);
589     ++pool->nreqs;
590     ++pool->nready;
591 root 1.5 reqq_push (&pool->req_queue, req);
592 root 1.6 X_COND_SIGNAL (pool->reqwait);
593     X_UNLOCK (pool->reqlock);
594 root 1.1
595 root 1.5 etp_maybe_start_thread (pool);
596 root 1.1 }
597     }
598    
599     ETP_API_DECL void ecb_cold
600 root 1.10 etp_set_max_poll_time (etp_pool pool, double seconds)
601 root 1.1 {
602 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
603 root 1.10 pool->max_poll_time = seconds * ETP_TICKS;
604 root 1.6 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
605 root 1.1 }
606    
607     ETP_API_DECL void ecb_cold
608 root 1.5 etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs)
609 root 1.1 {
610 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock);
611     pool->max_poll_reqs = maxreqs;
612     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock);
613 root 1.1 }
614    
615     ETP_API_DECL void ecb_cold
616 root 1.10 etp_set_max_idle (etp_pool pool, unsigned int threads)
617 root 1.1 {
618 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
619 root 1.10 pool->max_idle = threads;
620 root 1.6 if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
621 root 1.1 }
622    
623     ETP_API_DECL void ecb_cold
624 root 1.5 etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
625 root 1.1 {
626 root 1.6 if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock);
627     pool->idle_timeout = seconds;
628     if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock);
629 root 1.1 }
630    
631     ETP_API_DECL void ecb_cold
632 root 1.10 etp_set_min_parallel (etp_pool pool, unsigned int threads)
633 root 1.1 {
634 root 1.10 if (pool->wanted < threads)
635     pool->wanted = threads;
636 root 1.1 }
637    
638     ETP_API_DECL void ecb_cold
639 root 1.10 etp_set_max_parallel (etp_pool pool, unsigned int threads)
640 root 1.1 {
641 root 1.10 if (pool->wanted > threads)
642     pool->wanted = threads;
643 root 1.1
644 root 1.6 while (pool->started > pool->wanted)
645 root 1.5 etp_end_thread (pool);
646 root 1.1 }
647