ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libeio/etp.c
Revision: 1.3
Committed: Thu Jun 25 15:59:57 2015 UTC (8 years, 11 months ago) by root
Content type: text/plain
Branch: MAIN
Changes since 1.2: +19 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libetp implementation
3     *
4     * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann <libetp@schmorp.de>
5     * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #ifndef ETP_API_DECL
41     # define ETP_API_DECL static
42     #endif
43    
44     #ifndef ETP_PRI_MIN
45     # define ETP_PRI_MIN 0
46     # define ETP_PRI_MAX 0
47     #endif
48    
49     #ifndef ETP_TYPE_QUIT
50     # define ETP_TYPE_QUIT 0
51     #endif
52    
53     #ifndef ETP_TYPE_GROUP
54     # define ETP_TYPE_GROUP 1
55     #endif
56    
57     #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
58    
59     #define ETP_TICKS ((1000000 + 1023) >> 10)
60    
61     /* calculate time difference in ~1/ETP_TICKS of a second */
62     ecb_inline int
63     etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
64     {
65     return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS
66     + ((tv2->tv_usec - tv1->tv_usec) >> 10);
67     }
68    
69     static unsigned int started, idle, wanted = 4;
70    
71     static void (*want_poll_cb) (void);
72     static void (*done_poll_cb) (void);
73    
74     static unsigned int max_poll_time; /* reslock */
75     static unsigned int max_poll_reqs; /* reslock */
76    
77     static unsigned int nreqs; /* reqlock */
78     static unsigned int nready; /* reqlock */
79     static unsigned int npending; /* reqlock */
80     static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */
81     static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */
82    
83     static xmutex_t wrklock;
84     static xmutex_t reslock;
85     static xmutex_t reqlock;
86     static xcond_t reqwait;
87    
88 root 1.3 struct etp_tmpbuf
89     {
90     void *ptr;
91     int len;
92     };
93    
94     static void *
95     etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
96     {
97     if (buf->len < len)
98     {
99     free (buf->ptr);
100     buf->ptr = malloc (buf->len = len);
101     }
102    
103     return buf->ptr;
104     }
105    
106 root 1.1 typedef struct etp_worker
107     {
108 root 1.3 struct etp_tmpbuf tmpbuf;
109 root 1.1
110     /* locked by wrklock */
111     struct etp_worker *prev, *next;
112    
113     xthread_t tid;
114    
115     #ifdef ETP_WORKER_COMMON
116     ETP_WORKER_COMMON
117     #endif
118     } etp_worker;
119    
120     static etp_worker wrk_first; /* NOT etp */
121    
122     #define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
123     #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
124    
125     /* worker threads management */
126    
127     static void
128     etp_worker_clear (etp_worker *wrk)
129     {
130     }
131    
132     static void ecb_cold
133     etp_worker_free (etp_worker *wrk)
134     {
135     free (wrk->tmpbuf.ptr);
136    
137     wrk->next->prev = wrk->prev;
138     wrk->prev->next = wrk->next;
139    
140     free (wrk);
141     }
142    
143     ETP_API_DECL unsigned int
144     etp_nreqs (void)
145     {
146     int retval;
147     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
148     retval = nreqs;
149     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
150     return retval;
151     }
152    
153     ETP_API_DECL unsigned int
154     etp_nready (void)
155     {
156     unsigned int retval;
157    
158     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
159     retval = nready;
160     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
161    
162     return retval;
163     }
164    
165     ETP_API_DECL unsigned int
166     etp_npending (void)
167     {
168     unsigned int retval;
169    
170     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
171     retval = npending;
172     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
173    
174     return retval;
175     }
176    
177     ETP_API_DECL unsigned int
178     etp_nthreads (void)
179     {
180     unsigned int retval;
181    
182     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
183     retval = started;
184     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
185    
186     return retval;
187     }
188    
189     /*
190     * a somewhat faster data structure might be nice, but
191     * with 8 priorities this actually needs <20 insns
192     * per shift, the most expensive operation.
193     */
194     typedef struct {
195     ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
196     int size;
197     } etp_reqq;
198    
199     static etp_reqq req_queue;
200     static etp_reqq res_queue;
201    
202     static void ecb_noinline ecb_cold
203     reqq_init (etp_reqq *q)
204     {
205     int pri;
206    
207     for (pri = 0; pri < ETP_NUM_PRI; ++pri)
208     q->qs[pri] = q->qe[pri] = 0;
209    
210     q->size = 0;
211     }
212    
213     static int ecb_noinline
214     reqq_push (etp_reqq *q, ETP_REQ *req)
215     {
216     int pri = req->pri;
217     req->next = 0;
218    
219     if (q->qe[pri])
220     {
221     q->qe[pri]->next = req;
222     q->qe[pri] = req;
223     }
224     else
225     q->qe[pri] = q->qs[pri] = req;
226    
227     return q->size++;
228     }
229    
230     static ETP_REQ * ecb_noinline
231     reqq_shift (etp_reqq *q)
232     {
233     int pri;
234    
235     if (!q->size)
236     return 0;
237    
238     --q->size;
239    
240     for (pri = ETP_NUM_PRI; pri--; )
241     {
242 root 1.2 ETP_REQ *req = q->qs[pri];
243 root 1.1
244     if (req)
245     {
246 root 1.2 if (!(q->qs[pri] = (ETP_REQ *)req->next))
247 root 1.1 q->qe[pri] = 0;
248    
249     return req;
250     }
251     }
252    
253     abort ();
254     }
255    
256     ETP_API_DECL int ecb_cold
257     etp_init (void (*want_poll)(void), void (*done_poll)(void))
258     {
259     X_MUTEX_CREATE (wrklock);
260     X_MUTEX_CREATE (reslock);
261     X_MUTEX_CREATE (reqlock);
262     X_COND_CREATE (reqwait);
263    
264     reqq_init (&req_queue);
265     reqq_init (&res_queue);
266    
267     wrk_first.next =
268     wrk_first.prev = &wrk_first;
269    
270     started = 0;
271     idle = 0;
272     nreqs = 0;
273     nready = 0;
274     npending = 0;
275    
276     want_poll_cb = want_poll;
277     done_poll_cb = done_poll;
278    
279     return 0;
280     }
281    
282     /* not yet in etp.c */
283     X_THREAD_PROC (etp_proc);
284    
285     static void ecb_cold
286     etp_start_thread (void)
287     {
288     etp_worker *wrk = calloc (1, sizeof (etp_worker));
289    
290     /*TODO*/
291     assert (("unable to allocate worker thread data", wrk));
292    
293     X_LOCK (wrklock);
294    
295     if (xthread_create (&wrk->tid, etp_proc, (void *)wrk))
296     {
297     wrk->prev = &wrk_first;
298     wrk->next = wrk_first.next;
299     wrk_first.next->prev = wrk;
300     wrk_first.next = wrk;
301     ++started;
302     }
303     else
304     free (wrk);
305    
306     X_UNLOCK (wrklock);
307     }
308    
309     static void
310     etp_maybe_start_thread (void)
311     {
312     if (ecb_expect_true (etp_nthreads () >= wanted))
313     return;
314    
315     /* todo: maybe use idle here, but might be less exact */
316     if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()))
317     return;
318    
319     etp_start_thread ();
320     }
321    
322     static void ecb_cold
323     etp_end_thread (void)
324     {
325 root 1.2 ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */
326 root 1.1
327     req->type = ETP_TYPE_QUIT;
328     req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
329    
330     X_LOCK (reqlock);
331     reqq_push (&req_queue, req);
332     X_COND_SIGNAL (reqwait);
333     X_UNLOCK (reqlock);
334    
335     X_LOCK (wrklock);
336     --started;
337     X_UNLOCK (wrklock);
338     }
339    
340     ETP_API_DECL int
341     etp_poll (void)
342     {
343     unsigned int maxreqs;
344     unsigned int maxtime;
345     struct timeval tv_start, tv_now;
346    
347     X_LOCK (reslock);
348     maxreqs = max_poll_reqs;
349     maxtime = max_poll_time;
350     X_UNLOCK (reslock);
351    
352     if (maxtime)
353     gettimeofday (&tv_start, 0);
354    
355     for (;;)
356     {
357     ETP_REQ *req;
358    
359     etp_maybe_start_thread ();
360    
361     X_LOCK (reslock);
362     req = reqq_shift (&res_queue);
363    
364     if (req)
365     {
366     --npending;
367    
368     if (!res_queue.size && done_poll_cb)
369     done_poll_cb ();
370     }
371    
372     X_UNLOCK (reslock);
373    
374     if (!req)
375     return 0;
376    
377     X_LOCK (reqlock);
378     --nreqs;
379     X_UNLOCK (reqlock);
380    
381     if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
382     {
383     req->int1 = 1; /* mark request as delayed */
384     continue;
385     }
386     else
387     {
388     int res = ETP_FINISH (req);
389     if (ecb_expect_false (res))
390     return res;
391     }
392    
393     if (ecb_expect_false (maxreqs && !--maxreqs))
394     break;
395    
396     if (maxtime)
397     {
398     gettimeofday (&tv_now, 0);
399    
400     if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
401     break;
402     }
403     }
404    
405     errno = EAGAIN;
406     return -1;
407     }
408    
409     ETP_API_DECL void
410     etp_grp_cancel (ETP_REQ *grp);
411    
412     ETP_API_DECL void
413     etp_cancel (ETP_REQ *req)
414     {
415     req->cancelled = 1;
416    
417     etp_grp_cancel (req);
418     }
419    
420     ETP_API_DECL void
421     etp_grp_cancel (ETP_REQ *grp)
422     {
423     for (grp = grp->grp_first; grp; grp = grp->grp_next)
424     etp_cancel (grp);
425     }
426    
427     ETP_API_DECL void
428     etp_submit (ETP_REQ *req)
429     {
430     req->pri -= ETP_PRI_MIN;
431    
432     if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
433     if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
434    
435     if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
436     {
437     /* I hope this is worth it :/ */
438     X_LOCK (reqlock);
439     ++nreqs;
440     X_UNLOCK (reqlock);
441    
442     X_LOCK (reslock);
443    
444     ++npending;
445    
446     if (!reqq_push (&res_queue, req) && want_poll_cb)
447     want_poll_cb ();
448    
449     X_UNLOCK (reslock);
450     }
451     else
452     {
453     X_LOCK (reqlock);
454     ++nreqs;
455     ++nready;
456     reqq_push (&req_queue, req);
457     X_COND_SIGNAL (reqwait);
458     X_UNLOCK (reqlock);
459    
460     etp_maybe_start_thread ();
461     }
462     }
463    
464     ETP_API_DECL void ecb_cold
465     etp_set_max_poll_time (double nseconds)
466     {
467     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
468     max_poll_time = nseconds * ETP_TICKS;
469     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
470     }
471    
472     ETP_API_DECL void ecb_cold
473     etp_set_max_poll_reqs (unsigned int maxreqs)
474     {
475     if (WORDACCESS_UNSAFE) X_LOCK (reslock);
476     max_poll_reqs = maxreqs;
477     if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
478     }
479    
480     ETP_API_DECL void ecb_cold
481     etp_set_max_idle (unsigned int nthreads)
482     {
483     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
484     max_idle = nthreads;
485     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
486     }
487    
488     ETP_API_DECL void ecb_cold
489     etp_set_idle_timeout (unsigned int seconds)
490     {
491     if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
492     idle_timeout = seconds;
493     if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
494     }
495    
496     ETP_API_DECL void ecb_cold
497     etp_set_min_parallel (unsigned int nthreads)
498     {
499     if (wanted < nthreads)
500     wanted = nthreads;
501     }
502    
503     ETP_API_DECL void ecb_cold
504     etp_set_max_parallel (unsigned int nthreads)
505     {
506     if (wanted > nthreads)
507     wanted = nthreads;
508    
509     while (started > wanted)
510     etp_end_thread ();
511     }
512